1
2
3
4
5
6
7 package integration
8
9 import (
10 "bytes"
11 "context"
12 "io"
13 "math/rand"
14 "runtime"
15 "testing"
16 "time"
17
18 "go.mongodb.org/mongo-driver/bson"
19 "go.mongodb.org/mongo-driver/bson/primitive"
20 "go.mongodb.org/mongo-driver/event"
21 "go.mongodb.org/mongo-driver/internal/assert"
22 "go.mongodb.org/mongo-driver/internal/israce"
23 "go.mongodb.org/mongo-driver/mongo"
24 "go.mongodb.org/mongo-driver/mongo/gridfs"
25 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
26 "go.mongodb.org/mongo-driver/mongo/options"
27 )
28
29 func TestGridFS(x *testing.T) {
30 mt := mtest.New(x, noClientOpts)
31
32 mt.Run("skipping download", func(mt *mtest.T) {
33 data := []byte("abc.def.ghi")
34 var chunkSize int32 = 4
35
36 testcases := []struct {
37 name string
38
39 read int
40 skip int64
41 expectedSkipN int64
42 expectedSkipErr error
43 expectedRemaining int
44 }{
45 {
46 "read 0, skip 0", 0, 0, 0, nil, 11,
47 },
48 {
49 "read 0, skip to end of chunk", 0, 4, 4, nil, 7,
50 },
51 {
52 "read 0, skip 1", 0, 1, 1, nil, 10,
53 },
54 {
55 "read 1, skip to end of chunk", 1, 3, 3, nil, 7,
56 },
57 {
58 "read all, skip beyond", 11, 1, 0, nil, 0,
59 },
60 {
61 "skip all", 0, 11, 11, nil, 0,
62 },
63 {
64 "read 1, skip to last chunk", 1, 8, 8, nil, 2,
65 },
66 {
67 "read to last chunk, skip to end", 9, 2, 2, nil, 0,
68 },
69 {
70 "read to last chunk, skip beyond", 9, 4, 2, nil, 0,
71 },
72 }
73
74 for _, tc := range testcases {
75 mt.Run(tc.name, func(mt *mtest.T) {
76 bucket, err := gridfs.NewBucket(mt.DB, options.GridFSBucket().SetChunkSizeBytes(chunkSize))
77 assert.Nil(mt, err, "NewBucket error: %v", err)
78
79 ustream, err := bucket.OpenUploadStream("foo")
80 assert.Nil(mt, err, "OpenUploadStream error: %v", err)
81
82 id := ustream.FileID
83 _, err = ustream.Write(data)
84 assert.Nil(mt, err, "Write error: %v", err)
85 err = ustream.Close()
86 assert.Nil(mt, err, "Close error: %v", err)
87
88 dstream, err := bucket.OpenDownloadStream(id)
89 assert.Nil(mt, err, "OpenDownloadStream error")
90 dst := make([]byte, tc.read)
91 _, err = dstream.Read(dst)
92 assert.Nil(mt, err, "Read error: %v", err)
93
94 n, err := dstream.Skip(tc.skip)
95 assert.Equal(mt, tc.expectedSkipErr, err, "expected error on Skip: %v, got %v", tc.expectedSkipErr, err)
96 assert.Equal(mt, tc.expectedSkipN, n, "expected Skip to return: %v, got %v", tc.expectedSkipN, n)
97
98
99 dst = make([]byte, len(data))
100 remaining, err := dstream.Read(dst)
101 if err != nil {
102 assert.Equal(mt, err, io.EOF, "unexpected Read error: %v", err)
103 }
104 assert.Equal(mt, tc.expectedRemaining, remaining, "expected remaining data to be: %v, got %v", tc.expectedRemaining, remaining)
105 })
106 }
107 })
108
109 mt.Run("index creation", func(mt *mtest.T) {
110
111 bucket, err := gridfs.NewBucket(mt.DB)
112 assert.Nil(mt, err, "NewBucket error: %v", err)
113 err = bucket.SetWriteDeadline(time.Now().Add(5 * time.Second))
114 assert.Nil(mt, err, "SetWriteDeadline error: %v", err)
115
116 byteData := []byte("Hello, world!")
117 r := bytes.NewReader(byteData)
118
119 _, err = bucket.UploadFromStream("filename", r)
120 assert.Nil(mt, err, "UploadFromStream error: %v", err)
121
122 findCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
123 defer cancel()
124 findIndex(findCtx, mt, mt.DB.Collection("fs.files"), false, "key", "filename")
125 findIndex(findCtx, mt, mt.DB.Collection("fs.chunks"), true, "key", "files_id")
126 })
127
128 mt.Run("equivalent indexes", func(mt *mtest.T) {
129 tests := []struct {
130 name string
131 filesIndex bson.D
132 chunksIndex bson.D
133 newIndexes bool
134 }{
135 {
136 "numerically equal",
137 bson.D{
138 {"key", bson.D{{"filename", float64(1.0)}, {"uploadDate", float64(1.0)}}},
139 {"name", "filename_1_uploadDate_1"},
140 },
141 bson.D{
142 {"key", bson.D{{"files_id", float64(1.0)}, {"n", float64(1.0)}}},
143 {"name", "files_id_1_n_1"},
144 {"unique", true},
145 },
146 false,
147 },
148 {
149 "numerically inequal",
150 bson.D{
151 {"key", bson.D{{"filename", float64(-1.0)}, {"uploadDate", float64(1.0)}}},
152 {"name", "filename_-1_uploadDate_1"},
153 },
154 bson.D{
155 {"key", bson.D{{"files_id", float64(1.0)}, {"n", float64(-1.0)}}},
156 {"name", "files_id_1_n_-1"},
157 {"unique", true},
158 },
159 true,
160 },
161 }
162 for _, test := range tests {
163 mt.Run(test.name, func(mt *mtest.T) {
164 mt.Run("OpenUploadStream", func(mt *mtest.T) {
165
166 res := mt.DB.RunCommand(context.Background(),
167 bson.D{
168 {"createIndexes", "fs.files"},
169 {"indexes", bson.A{
170 test.filesIndex,
171 }},
172 },
173 )
174 assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
175
176 res = mt.DB.RunCommand(context.Background(),
177 bson.D{
178 {"createIndexes", "fs.chunks"},
179 {"indexes", bson.A{
180 test.chunksIndex,
181 }},
182 },
183 )
184 assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
185
186 mt.ClearEvents()
187
188 bucket, err := gridfs.NewBucket(mt.DB)
189 assert.Nil(mt, err, "NewBucket error: %v", err)
190 defer func() {
191 _ = bucket.Drop()
192 }()
193
194 _, err = bucket.OpenUploadStream("filename")
195 assert.Nil(mt, err, "OpenUploadStream error: %v", err)
196
197 mt.FilterStartedEvents(func(evt *event.CommandStartedEvent) bool {
198 return evt.CommandName == "createIndexes"
199 })
200 evt := mt.GetStartedEvent()
201 if test.newIndexes {
202 if evt == nil {
203 mt.Fatalf("expected createIndexes events but got none")
204 }
205 } else {
206 if evt != nil {
207 mt.Fatalf("expected no createIndexes events but got %v", evt.Command)
208 }
209 }
210 })
211 mt.Run("UploadFromStream", func(mt *mtest.T) {
212
213 res := mt.DB.RunCommand(context.Background(),
214 bson.D{
215 {"createIndexes", "fs.files"},
216 {"indexes", bson.A{
217 test.filesIndex,
218 }},
219 },
220 )
221 assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
222
223 res = mt.DB.RunCommand(context.Background(),
224 bson.D{
225 {"createIndexes", "fs.chunks"},
226 {"indexes", bson.A{
227 test.chunksIndex,
228 }},
229 },
230 )
231 assert.Nil(mt, res.Err(), "createIndexes error: %v", res.Err())
232
233 mt.ClearEvents()
234 var fileContent []byte
235 bucket, err := gridfs.NewBucket(mt.DB)
236 assert.Nil(mt, err, "NewBucket error: %v", err)
237 defer func() {
238 _ = bucket.Drop()
239 }()
240
241 _, err = bucket.UploadFromStream("filename", bytes.NewBuffer(fileContent))
242 assert.Nil(mt, err, "UploadFromStream error: %v", err)
243
244 mt.FilterStartedEvents(func(evt *event.CommandStartedEvent) bool {
245 return evt.CommandName == "createIndexes"
246 })
247 evt := mt.GetStartedEvent()
248 if test.newIndexes {
249 if evt == nil {
250 mt.Fatalf("expected createIndexes events but got none")
251 }
252 } else {
253 if evt != nil {
254 mt.Fatalf("expected no createIndexes events but got %v", evt.Command)
255 }
256 }
257 })
258 })
259 }
260 })
261
262 mt.RunOpts("download", noClientOpts, func(mt *mtest.T) {
263 mt.RunOpts("get file data", noClientOpts, func(mt *mtest.T) {
264
265
266 fileName := "get-file-data-test"
267 fileData := []byte{1, 2, 3, 4}
268 fileMetadata := bson.D{{"k1", "v1"}, {"k2", "v2"}}
269 rawMetadata, err := bson.Marshal(fileMetadata)
270 assert.Nil(mt, err, "Marshal error: %v", err)
271 uploadOpts := options.GridFSUpload().SetMetadata(fileMetadata)
272
273 testCases := []struct {
274 name string
275 fileID interface{}
276 }{
277 {"default ID", nil},
278 {"custom ID type", "customID"},
279 }
280 for _, tc := range testCases {
281 mt.Run(tc.name, func(mt *mtest.T) {
282
283 bucket, err := gridfs.NewBucket(mt.DB)
284 assert.Nil(mt, err, "NewBucket error: %v", err)
285 defer func() { _ = bucket.Drop() }()
286
287
288 uploadedFileID := tc.fileID
289 dataReader := bytes.NewReader(fileData)
290 if uploadedFileID == nil {
291 uploadedFileID, err = bucket.UploadFromStream(fileName, dataReader, uploadOpts)
292 } else {
293 err = bucket.UploadFromStreamWithID(tc.fileID, fileName, dataReader, uploadOpts)
294 }
295 assert.Nil(mt, err, "error uploading file: %v", err)
296
297
298
299 filesColl := mt.DB.Collection("fs.files")
300 uploadedFileDoc, err := filesColl.FindOne(context.Background(), bson.D{}).Raw()
301 assert.Nil(mt, err, "FindOne error: %v", err)
302 uploadTime := uploadedFileDoc.Lookup("uploadDate").Time().UTC()
303
304 expectedFile := &gridfs.File{
305 ID: uploadedFileID,
306 Length: int64(len(fileData)),
307 ChunkSize: gridfs.DefaultChunkSize,
308 UploadDate: uploadTime,
309 Name: fileName,
310 Metadata: rawMetadata,
311 }
312
313
314 mt.RunOpts("OpenDownloadStream", noClientOpts, func(mt *mtest.T) {
315 downloadStream, err := bucket.OpenDownloadStream(uploadedFileID)
316 assert.Nil(mt, err, "OpenDownloadStream error: %v", err)
317 actualFile := downloadStream.GetFile()
318 assert.Equal(mt, expectedFile, actualFile, "expected file %v, got %v", expectedFile, actualFile)
319 })
320 mt.RunOpts("OpenDownloadStreamByName", noClientOpts, func(mt *mtest.T) {
321 downloadStream, err := bucket.OpenDownloadStreamByName(fileName)
322 assert.Nil(mt, err, "OpenDownloadStream error: %v", err)
323 actualFile := downloadStream.GetFile()
324 assert.Equal(mt, expectedFile, actualFile, "expected file %v, got %v", expectedFile, actualFile)
325 })
326 })
327 }
328 })
329 mt.Run("chunk size determined by files collection document", func(mt *mtest.T) {
330
331
332
333 bucket, err := gridfs.NewBucket(mt.DB)
334 assert.Nil(mt, err, "NewBucket error: %v", err)
335 defer func() { _ = bucket.Drop() }()
336
337 fileData := []byte("hello world")
338 uploadOpts := options.GridFSUpload().SetChunkSizeBytes(4)
339 fileID, err := bucket.UploadFromStream("file", bytes.NewReader(fileData), uploadOpts)
340 assert.Nil(mt, err, "UploadFromStream error: %v", err)
341
342
343
344 var downloadBuffer bytes.Buffer
345 _, err = bucket.DownloadToStream(fileID, &downloadBuffer)
346 assert.Nil(mt, err, "DownloadToStream error: %v", err)
347
348 downloadedBytes := downloadBuffer.Bytes()
349 assert.Equal(mt, fileData, downloadedBytes, "expected bytes %s, got %s", fileData, downloadedBytes)
350 })
351 mt.Run("error if files collection document does not have a chunkSize field", func(mt *mtest.T) {
352
353
354
355 oid := primitive.NewObjectID()
356 filesDoc := bson.D{
357 {"_id", oid},
358 {"length", 10},
359 {"filename", "filename"},
360 }
361 _, err := mt.DB.Collection("fs.files").InsertOne(context.Background(), filesDoc)
362 assert.Nil(mt, err, "InsertOne error for files collection: %v", err)
363
364 bucket, err := gridfs.NewBucket(mt.DB)
365 assert.Nil(mt, err, "NewBucket error: %v", err)
366 defer func() { _ = bucket.Drop() }()
367
368 _, err = bucket.OpenDownloadStream(oid)
369 assert.Equal(mt, gridfs.ErrMissingChunkSize, err, "expected error %v, got %v", gridfs.ErrMissingChunkSize, err)
370 })
371 mt.Run("cursor error during read after downloading", func(mt *mtest.T) {
372
373
374
375
376 fileName := "read-error-test"
377 fileData := make([]byte, 17000000)
378
379 bucket, err := gridfs.NewBucket(mt.DB)
380 assert.Nil(mt, err, "NewBucket error: %v", err)
381 defer func() { _ = bucket.Drop() }()
382
383 dataReader := bytes.NewReader(fileData)
384 _, err = bucket.UploadFromStream(fileName, dataReader)
385 assert.Nil(mt, err, "UploadFromStream error: %v", err)
386
387 ds, err := bucket.OpenDownloadStreamByName(fileName)
388 assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err)
389
390 err = ds.SetReadDeadline(time.Now().Add(-1 * time.Second))
391 assert.Nil(mt, err, "SetReadDeadline error: %v", err)
392
393 p := make([]byte, len(fileData))
394 _, err = ds.Read(p)
395 assert.NotNil(mt, err, "expected error from Read, got nil")
396 assert.True(mt, mongo.IsTimeout(err), "expected error to be a timeout, got %v", err.Error())
397 })
398 mt.Run("cursor error during skip after downloading", func(mt *mtest.T) {
399
400
401
402
403 fileName := "skip-error-test"
404 fileData := make([]byte, 17000000)
405
406 bucket, err := gridfs.NewBucket(mt.DB)
407 assert.Nil(mt, err, "NewBucket error: %v", err)
408 defer func() { _ = bucket.Drop() }()
409
410 dataReader := bytes.NewReader(fileData)
411 _, err = bucket.UploadFromStream(fileName, dataReader)
412 assert.Nil(mt, err, "UploadFromStream error: %v", err)
413
414 ds, err := bucket.OpenDownloadStreamByName(fileName)
415 assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err)
416
417 err = ds.SetReadDeadline(time.Now().Add(-1 * time.Second))
418 assert.Nil(mt, err, "SetReadDeadline error: %v", err)
419
420 _, err = ds.Skip(int64(len(fileData)))
421 assert.NotNil(mt, err, "expected error from Skip, got nil")
422 assert.True(mt, mongo.IsTimeout(err), "expected error to be a timeout, got %v", err.Error())
423 })
424 })
425
426 mt.RunOpts("bucket collection accessors", noClientOpts, func(mt *mtest.T) {
427
428
429 fileData := []byte{1, 2, 3, 4}
430 var chunkSize int32 = 2
431
432 testCases := []struct {
433 name string
434 bucketName string
435 }{
436 {"default bucket name", ""},
437 {"custom bucket name", "bucket"},
438 }
439 for _, tc := range testCases {
440 mt.Run(tc.name, func(mt *mtest.T) {
441 bucketOpts := options.GridFSBucket().SetChunkSizeBytes(chunkSize)
442 if tc.bucketName != "" {
443 bucketOpts.SetName(tc.bucketName)
444 }
445 bucket, err := gridfs.NewBucket(mt.DB, bucketOpts)
446 assert.Nil(mt, err, "NewBucket error: %v", err)
447 defer func() { _ = bucket.Drop() }()
448
449 _, err = bucket.UploadFromStream("accessors-test-file", bytes.NewReader(fileData))
450 assert.Nil(mt, err, "UploadFromStream error: %v", err)
451
452 bucketName := tc.bucketName
453 if bucketName == "" {
454 bucketName = "fs"
455 }
456 assertGridFSCollectionState(mt, bucket.GetFilesCollection(), bucketName+".files", 1)
457 assertGridFSCollectionState(mt, bucket.GetChunksCollection(), bucketName+".chunks", 2)
458 })
459 }
460 })
461
462 mt.RunOpts("round trip", mtest.NewOptions().MaxServerVersion("3.6"), func(mt *mtest.T) {
463 skipRoundTripTest(mt)
464 oneK := 1024
465 smallBuffSize := 100
466
467 tests := []struct {
468 name string
469 chunkSize int
470 fileSize int
471 bufSize int
472 }{
473 {"RoundTrip: original", -1, oneK, -1},
474 {"RoundTrip: chunk size multiple of file", oneK, oneK * 16, -1},
475 {"RoundTrip: chunk size is file size", oneK, oneK, -1},
476 {"RoundTrip: chunk size multiple of file size and with strict buffer size", oneK, oneK * 16, smallBuffSize},
477 {"RoundTrip: chunk size multiple of file size and buffer size", oneK, oneK * 16, oneK * 16},
478 {"RoundTrip: chunk size, file size, buffer size all the same", oneK, oneK, oneK},
479 }
480
481 for _, test := range tests {
482 mt.Run(test.name, func(mt *mtest.T) {
483 var chunkSize *int32
484 var temp int32
485 if test.chunkSize != -1 {
486 temp = int32(test.chunkSize)
487 chunkSize = &temp
488 }
489
490 bucket, err := gridfs.NewBucket(mt.DB, &options.BucketOptions{
491 ChunkSizeBytes: chunkSize,
492 })
493 assert.Nil(mt, err, "NewBucket error: %v", err)
494
495 timeout := 5 * time.Second
496 if israce.Enabled {
497 timeout = 20 * time.Second
498 }
499
500 err = bucket.SetWriteDeadline(time.Now().Add(timeout))
501 assert.Nil(mt, err, "SetWriteDeadline error: %v", err)
502
503
504
505 size := test.fileSize
506 p := make([]byte, size)
507 for i := 0; i < size; i++ {
508 p[i] = byte(rand.Intn(100))
509 }
510
511 _, err = bucket.UploadFromStream("filename", bytes.NewReader(p))
512 assert.Nil(mt, err, "UploadFromStream error: %v", err)
513
514 var w *bytes.Buffer
515 if test.bufSize == -1 {
516 w = bytes.NewBuffer(make([]byte, 0))
517 } else {
518 w = bytes.NewBuffer(make([]byte, 0, test.bufSize))
519 }
520
521 _, err = bucket.DownloadToStreamByName("filename", w)
522 assert.Nil(mt, err, "DownloadToStreamByName error: %v", err)
523 assert.Equal(mt, p, w.Bytes(), "downloaded file did not match p")
524 })
525 }
526 })
527
528
529 mt.Run("Find", func(mt *mtest.T) {
530 bucket, err := gridfs.NewBucket(mt.DB)
531 assert.Nil(mt, err, "NewBucket error: %v", err)
532
533 cursor, err := bucket.Find(bson.D{{"foo", "bar"}})
534 defer func() {
535 _ = cursor.Close(context.Background())
536 }()
537
538 assert.Nil(mt, err, "Find error: %v", err)
539 })
540 }
541
542 func assertGridFSCollectionState(mt *mtest.T, coll *mongo.Collection, expectedName string, expectedNumDocuments int64) {
543 mt.Helper()
544
545 assert.Equal(mt, expectedName, coll.Name(), "expected collection name %v, got %v", expectedName, coll.Name())
546 count, err := coll.CountDocuments(context.Background(), bson.D{})
547 assert.Nil(mt, err, "CountDocuments error: %v", err)
548 assert.Equal(mt, expectedNumDocuments, count, "expected %d documents in collection, got %d", expectedNumDocuments,
549 count)
550 }
551
552 func findIndex(ctx context.Context, mt *mtest.T, coll *mongo.Collection, unique bool, keys ...string) {
553 mt.Helper()
554 cur, err := coll.Indexes().List(ctx)
555 assert.Nil(mt, err, "Indexes List error: %v", err)
556
557 foundIndex := false
558 for cur.Next(ctx) {
559 if _, err := cur.Current.LookupErr(keys...); err == nil {
560 if uVal, err := cur.Current.LookupErr("unique"); (unique && err == nil && uVal.Boolean() == true) ||
561 (!unique && (err != nil || uVal.Boolean() == false)) {
562
563 foundIndex = true
564 }
565 }
566 }
567 assert.True(mt, foundIndex, "index %v not found", keys)
568 }
569
570 func skipRoundTripTest(mt *mtest.T) {
571 if runtime.GOOS != "darwin" {
572 return
573 }
574
575 var serverStatus bson.Raw
576 err := mt.DB.RunCommand(
577 context.Background(),
578 bson.D{{"serverStatus", 1}},
579 ).Decode(&serverStatus)
580 assert.Nil(mt, err, "serverStatus error %v", err)
581
582
583 _, err = serverStatus.LookupErr("sharding")
584 if err != nil {
585 return
586 }
587 _, err = serverStatus.LookupErr("security")
588 if err != nil {
589 return
590 }
591 mt.Skip("skipping round trip test")
592 }
593
View as plain text