...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/gridfs_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"bytes"
    11  	"context"
    12  	"io"
    13  	"math/rand"
    14  	"runtime"
    15  	"testing"
    16  	"time"
    17  
    18  	"go.mongodb.org/mongo-driver/bson"
    19  	"go.mongodb.org/mongo-driver/bson/primitive"
    20  	"go.mongodb.org/mongo-driver/event"
    21  	"go.mongodb.org/mongo-driver/internal/assert"
    22  	"go.mongodb.org/mongo-driver/internal/israce"
    23  	"go.mongodb.org/mongo-driver/mongo"
    24  	"go.mongodb.org/mongo-driver/mongo/gridfs"
    25  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    26  	"go.mongodb.org/mongo-driver/mongo/options"
    27  )
    28  
    29  func TestGridFS(x *testing.T) {
    30  	mt := mtest.New(x, noClientOpts)
    31  
    32  	mt.Run("skipping download", func(mt *mtest.T) {
    33  		data := []byte("abc.def.ghi")
    34  		var chunkSize int32 = 4
    35  
    36  		testcases := []struct {
    37  			name string
    38  
    39  			read              int
    40  			skip              int64
    41  			expectedSkipN     int64
    42  			expectedSkipErr   error
    43  			expectedRemaining int
    44  		}{
    45  			{
    46  				"read 0, skip 0", 0, 0, 0, nil, 11,
    47  			},
    48  			{
    49  				"read 0, skip to end of chunk", 0, 4, 4, nil, 7,
    50  			},
    51  			{
    52  				"read 0, skip 1", 0, 1, 1, nil, 10,
    53  			},
    54  			{
    55  				"read 1, skip to end of chunk", 1, 3, 3, nil, 7,
    56  			},
    57  			{
    58  				"read all, skip beyond", 11, 1, 0, nil, 0,
    59  			},
    60  			{
    61  				"skip all", 0, 11, 11, nil, 0,
    62  			},
    63  			{
    64  				"read 1, skip to last chunk", 1, 8, 8, nil, 2,
    65  			},
    66  			{
    67  				"read to last chunk, skip to end", 9, 2, 2, nil, 0,
    68  			},
    69  			{
    70  				"read to last chunk, skip beyond", 9, 4, 2, nil, 0,
    71  			},
    72  		}
    73  
    74  		for _, tc := range testcases {
    75  			mt.Run(tc.name, func(mt *mtest.T) {
    76  				bucket, err := gridfs.NewBucket(mt.DB, options.GridFSBucket().SetChunkSizeBytes(chunkSize))
    77  				assert.Nil(mt, err, "NewBucket error: %v", err)
    78  
    79  				ustream, err := bucket.OpenUploadStream("foo")
    80  				assert.Nil(mt, err, "OpenUploadStream error: %v", err)
    81  
    82  				id := ustream.FileID
    83  				_, err = ustream.Write(data)
    84  				assert.Nil(mt, err, "Write error: %v", err)
    85  				err = ustream.Close()
    86  				assert.Nil(mt, err, "Close error: %v", err)
    87  
    88  				dstream, err := bucket.OpenDownloadStream(id)
    89  				assert.Nil(mt, err, "OpenDownloadStream error")
    90  				dst := make([]byte, tc.read)
    91  				_, err = dstream.Read(dst)
    92  				assert.Nil(mt, err, "Read error: %v", err)
    93  
    94  				n, err := dstream.Skip(tc.skip)
    95  				assert.Equal(mt, tc.expectedSkipErr, err, "expected error on Skip: %v, got %v", tc.expectedSkipErr, err)
    96  				assert.Equal(mt, tc.expectedSkipN, n, "expected Skip to return: %v, got %v", tc.expectedSkipN, n)
    97  
    98  				// Read the rest.
    99  				dst = make([]byte, len(data))
   100  				remaining, err := dstream.Read(dst)
   101  				if err != nil {
   102  					assert.Equal(mt, err, io.EOF, "unexpected Read error: %v", err)
   103  				}
   104  				assert.Equal(mt, tc.expectedRemaining, remaining, "expected remaining data to be: %v, got %v", tc.expectedRemaining, remaining)
   105  			})
   106  		}
   107  	})
   108  
   109  	mt.Run("index creation", func(mt *mtest.T) {
   110  		// Unit tests showing that UploadFromStream creates indexes on the chunks and files collections.
   111  		bucket, err := gridfs.NewBucket(mt.DB)
   112  		assert.Nil(mt, err, "NewBucket error: %v", err)
   113  		err = bucket.SetWriteDeadline(time.Now().Add(5 * time.Second))
   114  		assert.Nil(mt, err, "SetWriteDeadline error: %v", err)
   115  
   116  		byteData := []byte("Hello, world!")
   117  		r := bytes.NewReader(byteData)
   118  
   119  		_, err = bucket.UploadFromStream("filename", r)
   120  		assert.Nil(mt, err, "UploadFromStream error: %v", err)
   121  
   122  		findCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   123  		defer cancel()
   124  		findIndex(findCtx, mt, mt.DB.Collection("fs.files"), false, "key", "filename")
   125  		findIndex(findCtx, mt, mt.DB.Collection("fs.chunks"), true, "key", "files_id")
   126  	})
   127  	// should not create a new index if index is numerically the same
   128  	mt.Run("equivalent indexes", func(mt *mtest.T) {
   129  		tests := []struct {
   130  			name        string
   131  			filesIndex  bson.D
   132  			chunksIndex bson.D
   133  			newIndexes  bool
   134  		}{
   135  			{
   136  				"numerically equal",
   137  				bson.D{
   138  					{"key", bson.D{{"filename", float64(1.0)}, {"uploadDate", float64(1.0)}}},
   139  					{"name", "filename_1_uploadDate_1"},
   140  				},
   141  				bson.D{
   142  					{"key", bson.D{{"files_id", float64(1.0)}, {"n", float64(1.0)}}},
   143  					{"name", "files_id_1_n_1"},
   144  					{"unique", true},
   145  				},
   146  				false,
   147  			},
   148  			{
   149  				"numerically inequal",
   150  				bson.D{
   151  					{"key", bson.D{{"filename", float64(-1.0)}, {"uploadDate", float64(1.0)}}},
   152  					{"name", "filename_-1_uploadDate_1"},
   153  				},
   154  				bson.D{
   155  					{"key", bson.D{{"files_id", float64(1.0)}, {"n", float64(-1.0)}}},
   156  					{"name", "files_id_1_n_-1"},
   157  					{"unique", true},
   158  				},
   159  				true,
   160  			},
   161  		}
   162  		for _, test := range tests {
   163  			mt.Run(test.name, func(mt *mtest.T) {
   164  				mt.Run("OpenUploadStream", func(mt *mtest.T) {
   165  					// add indexes with floats to collections manually
   166  					res := mt.DB.RunCommand(context.Background(),
   167  						bson.D{
   168  							{"createIndexes", "fs.files"},
   169  							{"indexes", bson.A{
   170  								test.filesIndex,
   171  							}},
   172  						},
   173  					)
   174  					assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
   175  
   176  					res = mt.DB.RunCommand(context.Background(),
   177  						bson.D{
   178  							{"createIndexes", "fs.chunks"},
   179  							{"indexes", bson.A{
   180  								test.chunksIndex,
   181  							}},
   182  						},
   183  					)
   184  					assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
   185  
   186  					mt.ClearEvents()
   187  
   188  					bucket, err := gridfs.NewBucket(mt.DB)
   189  					assert.Nil(mt, err, "NewBucket error: %v", err)
   190  					defer func() {
   191  						_ = bucket.Drop()
   192  					}()
   193  
   194  					_, err = bucket.OpenUploadStream("filename")
   195  					assert.Nil(mt, err, "OpenUploadStream error: %v", err)
   196  
   197  					mt.FilterStartedEvents(func(evt *event.CommandStartedEvent) bool {
   198  						return evt.CommandName == "createIndexes"
   199  					})
   200  					evt := mt.GetStartedEvent()
   201  					if test.newIndexes {
   202  						if evt == nil {
   203  							mt.Fatalf("expected createIndexes events but got none")
   204  						}
   205  					} else {
   206  						if evt != nil {
   207  							mt.Fatalf("expected no createIndexes events but got %v", evt.Command)
   208  						}
   209  					}
   210  				})
   211  				mt.Run("UploadFromStream", func(mt *mtest.T) {
   212  					// add indexes with floats to collections manually
   213  					res := mt.DB.RunCommand(context.Background(),
   214  						bson.D{
   215  							{"createIndexes", "fs.files"},
   216  							{"indexes", bson.A{
   217  								test.filesIndex,
   218  							}},
   219  						},
   220  					)
   221  					assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
   222  
   223  					res = mt.DB.RunCommand(context.Background(),
   224  						bson.D{
   225  							{"createIndexes", "fs.chunks"},
   226  							{"indexes", bson.A{
   227  								test.chunksIndex,
   228  							}},
   229  						},
   230  					)
   231  					assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
   232  
   233  					mt.ClearEvents()
   234  					var fileContent []byte
   235  					bucket, err := gridfs.NewBucket(mt.DB)
   236  					assert.Nil(mt, err, "NewBucket error: %v", err)
   237  					defer func() {
   238  						_ = bucket.Drop()
   239  					}()
   240  
   241  					_, err = bucket.UploadFromStream("filename", bytes.NewBuffer(fileContent))
   242  					assert.Nil(mt, err, "UploadFromStream error: %v", err)
   243  
   244  					mt.FilterStartedEvents(func(evt *event.CommandStartedEvent) bool {
   245  						return evt.CommandName == "createIndexes"
   246  					})
   247  					evt := mt.GetStartedEvent()
   248  					if test.newIndexes {
   249  						if evt == nil {
   250  							mt.Fatalf("expected createIndexes events but got none")
   251  						}
   252  					} else {
   253  						if evt != nil {
   254  							mt.Fatalf("expected no createIndexes events but got %v", evt.Command)
   255  						}
   256  					}
   257  				})
   258  			})
   259  		}
   260  	})
   261  
   262  	mt.RunOpts("download", noClientOpts, func(mt *mtest.T) {
   263  		mt.RunOpts("get file data", noClientOpts, func(mt *mtest.T) {
   264  			// Tests for the DownloadStream.GetFile method.
   265  
   266  			fileName := "get-file-data-test"
   267  			fileData := []byte{1, 2, 3, 4}
   268  			fileMetadata := bson.D{{"k1", "v1"}, {"k2", "v2"}}
   269  			rawMetadata, err := bson.Marshal(fileMetadata)
   270  			assert.Nil(mt, err, "Marshal error: %v", err)
   271  			uploadOpts := options.GridFSUpload().SetMetadata(fileMetadata)
   272  
   273  			testCases := []struct {
   274  				name   string
   275  				fileID interface{}
   276  			}{
   277  				{"default ID", nil},
   278  				{"custom ID type", "customID"},
   279  			}
   280  			for _, tc := range testCases {
   281  				mt.Run(tc.name, func(mt *mtest.T) {
   282  					// Create a new GridFS bucket.
   283  					bucket, err := gridfs.NewBucket(mt.DB)
   284  					assert.Nil(mt, err, "NewBucket error: %v", err)
   285  					defer func() { _ = bucket.Drop() }()
   286  
   287  					// Upload the file and store the uploaded file ID.
   288  					uploadedFileID := tc.fileID
   289  					dataReader := bytes.NewReader(fileData)
   290  					if uploadedFileID == nil {
   291  						uploadedFileID, err = bucket.UploadFromStream(fileName, dataReader, uploadOpts)
   292  					} else {
   293  						err = bucket.UploadFromStreamWithID(tc.fileID, fileName, dataReader, uploadOpts)
   294  					}
   295  					assert.Nil(mt, err, "error uploading file: %v", err)
   296  
   297  					// The uploadDate field is calculated when the upload is complete. Manually fetch it from the
   298  					// fs.files collection to use in assertions.
   299  					filesColl := mt.DB.Collection("fs.files")
   300  					uploadedFileDoc, err := filesColl.FindOne(context.Background(), bson.D{}).Raw()
   301  					assert.Nil(mt, err, "FindOne error: %v", err)
   302  					uploadTime := uploadedFileDoc.Lookup("uploadDate").Time().UTC()
   303  
   304  					expectedFile := &gridfs.File{
   305  						ID:         uploadedFileID,
   306  						Length:     int64(len(fileData)),
   307  						ChunkSize:  gridfs.DefaultChunkSize,
   308  						UploadDate: uploadTime,
   309  						Name:       fileName,
   310  						Metadata:   rawMetadata,
   311  					}
   312  					// For both methods that create a DownloadStream, open a stream and compare the file given by the
   313  					// stream to the expected File object.
   314  					mt.RunOpts("OpenDownloadStream", noClientOpts, func(mt *mtest.T) {
   315  						downloadStream, err := bucket.OpenDownloadStream(uploadedFileID)
   316  						assert.Nil(mt, err, "OpenDownloadStream error: %v", err)
   317  						actualFile := downloadStream.GetFile()
   318  						assert.Equal(mt, expectedFile, actualFile, "expected file %v, got %v", expectedFile, actualFile)
   319  					})
   320  					mt.RunOpts("OpenDownloadStreamByName", noClientOpts, func(mt *mtest.T) {
   321  						downloadStream, err := bucket.OpenDownloadStreamByName(fileName)
   322  						assert.Nil(mt, err, "OpenDownloadStream error: %v", err)
   323  						actualFile := downloadStream.GetFile()
   324  						assert.Equal(mt, expectedFile, actualFile, "expected file %v, got %v", expectedFile, actualFile)
   325  					})
   326  				})
   327  			}
   328  		})
   329  		mt.Run("chunk size determined by files collection document", func(mt *mtest.T) {
   330  			// Test that the chunk size for a file download is determined by the chunkSize field in the files
   331  			// collection document, not the bucket's chunk size.
   332  
   333  			bucket, err := gridfs.NewBucket(mt.DB)
   334  			assert.Nil(mt, err, "NewBucket error: %v", err)
   335  			defer func() { _ = bucket.Drop() }()
   336  
   337  			fileData := []byte("hello world")
   338  			uploadOpts := options.GridFSUpload().SetChunkSizeBytes(4)
   339  			fileID, err := bucket.UploadFromStream("file", bytes.NewReader(fileData), uploadOpts)
   340  			assert.Nil(mt, err, "UploadFromStream error: %v", err)
   341  
   342  			// If the bucket's chunk size was used, this would error because the actual chunk size is 4 and the bucket
   343  			// chunk size is 255 KB.
   344  			var downloadBuffer bytes.Buffer
   345  			_, err = bucket.DownloadToStream(fileID, &downloadBuffer)
   346  			assert.Nil(mt, err, "DownloadToStream error: %v", err)
   347  
   348  			downloadedBytes := downloadBuffer.Bytes()
   349  			assert.Equal(mt, fileData, downloadedBytes, "expected bytes %s, got %s", fileData, downloadedBytes)
   350  		})
   351  		mt.Run("error if files collection document does not have a chunkSize field", func(mt *mtest.T) {
   352  			// Test that opening a download returns ErrMissingChunkSize if the files collection document has no
   353  			// chunk size field.
   354  
   355  			oid := primitive.NewObjectID()
   356  			filesDoc := bson.D{
   357  				{"_id", oid},
   358  				{"length", 10},
   359  				{"filename", "filename"},
   360  			}
   361  			_, err := mt.DB.Collection("fs.files").InsertOne(context.Background(), filesDoc)
   362  			assert.Nil(mt, err, "InsertOne error for files collection: %v", err)
   363  
   364  			bucket, err := gridfs.NewBucket(mt.DB)
   365  			assert.Nil(mt, err, "NewBucket error: %v", err)
   366  			defer func() { _ = bucket.Drop() }()
   367  
   368  			_, err = bucket.OpenDownloadStream(oid)
   369  			assert.Equal(mt, gridfs.ErrMissingChunkSize, err, "expected error %v, got %v", gridfs.ErrMissingChunkSize, err)
   370  		})
   371  		mt.Run("cursor error during read after downloading", func(mt *mtest.T) {
   372  			// To simulate a cursor error we upload a file larger than the 16MB default batch size,
   373  			// so the underlying cursor remains open on the server. Since the ReadDeadline is
   374  			// set in the past, Read should cause a timeout.
   375  
   376  			fileName := "read-error-test"
   377  			fileData := make([]byte, 17000000)
   378  
   379  			bucket, err := gridfs.NewBucket(mt.DB)
   380  			assert.Nil(mt, err, "NewBucket error: %v", err)
   381  			defer func() { _ = bucket.Drop() }()
   382  
   383  			dataReader := bytes.NewReader(fileData)
   384  			_, err = bucket.UploadFromStream(fileName, dataReader)
   385  			assert.Nil(mt, err, "UploadFromStream error: %v", err)
   386  
   387  			ds, err := bucket.OpenDownloadStreamByName(fileName)
   388  			assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err)
   389  
   390  			err = ds.SetReadDeadline(time.Now().Add(-1 * time.Second))
   391  			assert.Nil(mt, err, "SetReadDeadline error: %v", err)
   392  
   393  			p := make([]byte, len(fileData))
   394  			_, err = ds.Read(p)
   395  			assert.NotNil(mt, err, "expected error from Read, got nil")
   396  			assert.True(mt, mongo.IsTimeout(err), "expected error to be a timeout, got %v", err.Error())
   397  		})
   398  		mt.Run("cursor error during skip after downloading", func(mt *mtest.T) {
   399  			// To simulate a cursor error we upload a file larger than the 16MB default batch size,
   400  			// so the underlying cursor remains open on the server. Since the ReadDeadline is
   401  			// set in the past, Skip should cause a timeout.
   402  
   403  			fileName := "skip-error-test"
   404  			fileData := make([]byte, 17000000)
   405  
   406  			bucket, err := gridfs.NewBucket(mt.DB)
   407  			assert.Nil(mt, err, "NewBucket error: %v", err)
   408  			defer func() { _ = bucket.Drop() }()
   409  
   410  			dataReader := bytes.NewReader(fileData)
   411  			_, err = bucket.UploadFromStream(fileName, dataReader)
   412  			assert.Nil(mt, err, "UploadFromStream error: %v", err)
   413  
   414  			ds, err := bucket.OpenDownloadStreamByName(fileName)
   415  			assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err)
   416  
   417  			err = ds.SetReadDeadline(time.Now().Add(-1 * time.Second))
   418  			assert.Nil(mt, err, "SetReadDeadline error: %v", err)
   419  
   420  			_, err = ds.Skip(int64(len(fileData)))
   421  			assert.NotNil(mt, err, "expected error from Skip, got nil")
   422  			assert.True(mt, mongo.IsTimeout(err), "expected error to be a timeout, got %v", err.Error())
   423  		})
   424  	})
   425  
   426  	mt.RunOpts("bucket collection accessors", noClientOpts, func(mt *mtest.T) {
   427  		// Tests for the GetFilesCollection and GetChunksCollection accessors.
   428  
   429  		fileData := []byte{1, 2, 3, 4}
   430  		var chunkSize int32 = 2
   431  
   432  		testCases := []struct {
   433  			name       string
   434  			bucketName string // defaults to "fs"
   435  		}{
   436  			{"default bucket name", ""},
   437  			{"custom bucket name", "bucket"},
   438  		}
   439  		for _, tc := range testCases {
   440  			mt.Run(tc.name, func(mt *mtest.T) {
   441  				bucketOpts := options.GridFSBucket().SetChunkSizeBytes(chunkSize)
   442  				if tc.bucketName != "" {
   443  					bucketOpts.SetName(tc.bucketName)
   444  				}
   445  				bucket, err := gridfs.NewBucket(mt.DB, bucketOpts)
   446  				assert.Nil(mt, err, "NewBucket error: %v", err)
   447  				defer func() { _ = bucket.Drop() }()
   448  
   449  				_, err = bucket.UploadFromStream("accessors-test-file", bytes.NewReader(fileData))
   450  				assert.Nil(mt, err, "UploadFromStream error: %v", err)
   451  
   452  				bucketName := tc.bucketName
   453  				if bucketName == "" {
   454  					bucketName = "fs"
   455  				}
   456  				assertGridFSCollectionState(mt, bucket.GetFilesCollection(), bucketName+".files", 1)
   457  				assertGridFSCollectionState(mt, bucket.GetChunksCollection(), bucketName+".chunks", 2)
   458  			})
   459  		}
   460  	})
   461  
   462  	mt.RunOpts("round trip", mtest.NewOptions().MaxServerVersion("3.6"), func(mt *mtest.T) {
   463  		skipRoundTripTest(mt)
   464  		oneK := 1024
   465  		smallBuffSize := 100
   466  
   467  		tests := []struct {
   468  			name      string
   469  			chunkSize int // make -1 for no capacity for no chunkSize
   470  			fileSize  int
   471  			bufSize   int // make -1 for no capacity for no bufSize
   472  		}{
   473  			{"RoundTrip: original", -1, oneK, -1},
   474  			{"RoundTrip: chunk size multiple of file", oneK, oneK * 16, -1},
   475  			{"RoundTrip: chunk size is file size", oneK, oneK, -1},
   476  			{"RoundTrip: chunk size multiple of file size and with strict buffer size", oneK, oneK * 16, smallBuffSize},
   477  			{"RoundTrip: chunk size multiple of file size and buffer size", oneK, oneK * 16, oneK * 16},
   478  			{"RoundTrip: chunk size, file size, buffer size all the same", oneK, oneK, oneK},
   479  		}
   480  
   481  		for _, test := range tests {
   482  			mt.Run(test.name, func(mt *mtest.T) {
   483  				var chunkSize *int32
   484  				var temp int32
   485  				if test.chunkSize != -1 {
   486  					temp = int32(test.chunkSize)
   487  					chunkSize = &temp
   488  				}
   489  
   490  				bucket, err := gridfs.NewBucket(mt.DB, &options.BucketOptions{
   491  					ChunkSizeBytes: chunkSize,
   492  				})
   493  				assert.Nil(mt, err, "NewBucket error: %v", err)
   494  
   495  				timeout := 5 * time.Second
   496  				if israce.Enabled {
   497  					timeout = 20 * time.Second // race detector causes 2-20x slowdown
   498  				}
   499  
   500  				err = bucket.SetWriteDeadline(time.Now().Add(timeout))
   501  				assert.Nil(mt, err, "SetWriteDeadline error: %v", err)
   502  
   503  				// Test that Upload works when the buffer to write is longer than the upload stream's internal buffer.
   504  				// This requires multiple calls to uploadChunks.
   505  				size := test.fileSize
   506  				p := make([]byte, size)
   507  				for i := 0; i < size; i++ {
   508  					p[i] = byte(rand.Intn(100))
   509  				}
   510  
   511  				_, err = bucket.UploadFromStream("filename", bytes.NewReader(p))
   512  				assert.Nil(mt, err, "UploadFromStream error: %v", err)
   513  
   514  				var w *bytes.Buffer
   515  				if test.bufSize == -1 {
   516  					w = bytes.NewBuffer(make([]byte, 0))
   517  				} else {
   518  					w = bytes.NewBuffer(make([]byte, 0, test.bufSize))
   519  				}
   520  
   521  				_, err = bucket.DownloadToStreamByName("filename", w)
   522  				assert.Nil(mt, err, "DownloadToStreamByName error: %v", err)
   523  				assert.Equal(mt, p, w.Bytes(), "downloaded file did not match p")
   524  			})
   525  		}
   526  	})
   527  
   528  	// Regression test for a bug introduced in GODRIVER-2346.
   529  	mt.Run("Find", func(mt *mtest.T) {
   530  		bucket, err := gridfs.NewBucket(mt.DB)
   531  		assert.Nil(mt, err, "NewBucket error: %v", err)
   532  		// Find the file back.
   533  		cursor, err := bucket.Find(bson.D{{"foo", "bar"}})
   534  		defer func() {
   535  			_ = cursor.Close(context.Background())
   536  		}()
   537  
   538  		assert.Nil(mt, err, "Find error: %v", err)
   539  	})
   540  }
   541  
   542  func assertGridFSCollectionState(mt *mtest.T, coll *mongo.Collection, expectedName string, expectedNumDocuments int64) {
   543  	mt.Helper()
   544  
   545  	assert.Equal(mt, expectedName, coll.Name(), "expected collection name %v, got %v", expectedName, coll.Name())
   546  	count, err := coll.CountDocuments(context.Background(), bson.D{})
   547  	assert.Nil(mt, err, "CountDocuments error: %v", err)
   548  	assert.Equal(mt, expectedNumDocuments, count, "expected %d documents in collection, got %d", expectedNumDocuments,
   549  		count)
   550  }
   551  
   552  func findIndex(ctx context.Context, mt *mtest.T, coll *mongo.Collection, unique bool, keys ...string) {
   553  	mt.Helper()
   554  	cur, err := coll.Indexes().List(ctx)
   555  	assert.Nil(mt, err, "Indexes List error: %v", err)
   556  
   557  	foundIndex := false
   558  	for cur.Next(ctx) {
   559  		if _, err := cur.Current.LookupErr(keys...); err == nil {
   560  			if uVal, err := cur.Current.LookupErr("unique"); (unique && err == nil && uVal.Boolean() == true) ||
   561  				(!unique && (err != nil || uVal.Boolean() == false)) {
   562  
   563  				foundIndex = true
   564  			}
   565  		}
   566  	}
   567  	assert.True(mt, foundIndex, "index %v not found", keys)
   568  }
   569  
   570  func skipRoundTripTest(mt *mtest.T) {
   571  	if runtime.GOOS != "darwin" {
   572  		return
   573  	}
   574  
   575  	var serverStatus bson.Raw
   576  	err := mt.DB.RunCommand(
   577  		context.Background(),
   578  		bson.D{{"serverStatus", 1}},
   579  	).Decode(&serverStatus)
   580  	assert.Nil(mt, err, "serverStatus error %v", err)
   581  
   582  	// can run on non-sharded clusters or on sharded cluster with auth/ssl disabled
   583  	_, err = serverStatus.LookupErr("sharding")
   584  	if err != nil {
   585  		return
   586  	}
   587  	_, err = serverStatus.LookupErr("security")
   588  	if err != nil {
   589  		return
   590  	}
   591  	mt.Skip("skipping round trip test")
   592  }
   593  

View as plain text