1
2
3
4
5
6
7 package gridfs
8
9 import (
10 "bytes"
11 "context"
12 "errors"
13 "fmt"
14 "io"
15 "time"
16
17 "go.mongodb.org/mongo-driver/bson"
18 "go.mongodb.org/mongo-driver/bson/primitive"
19 "go.mongodb.org/mongo-driver/internal/csot"
20 "go.mongodb.org/mongo-driver/mongo"
21 "go.mongodb.org/mongo-driver/mongo/options"
22 "go.mongodb.org/mongo-driver/mongo/readconcern"
23 "go.mongodb.org/mongo-driver/mongo/readpref"
24 "go.mongodb.org/mongo-driver/mongo/writeconcern"
25 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
26 )
27
28
29
30
31 const DefaultChunkSize int32 = 255 * 1024
32
33
34 var ErrFileNotFound = errors.New("file with given parameters not found")
35
36
37 var ErrMissingChunkSize = errors.New("files collection document does not contain a 'chunkSize' field")
38
39
40 type Bucket struct {
41 db *mongo.Database
42 chunksColl *mongo.Collection
43 filesColl *mongo.Collection
44
45 name string
46 chunkSize int32
47 wc *writeconcern.WriteConcern
48 rc *readconcern.ReadConcern
49 rp *readpref.ReadPref
50
51 firstWriteDone bool
52 readBuf []byte
53 writeBuf []byte
54
55 readDeadline time.Time
56 writeDeadline time.Time
57 }
58
59
60 type Upload struct {
61 chunkSize int32
62 metadata bson.D
63 }
64
65
66 func NewBucket(db *mongo.Database, opts ...*options.BucketOptions) (*Bucket, error) {
67 b := &Bucket{
68 name: "fs",
69 chunkSize: DefaultChunkSize,
70 db: db,
71 wc: db.WriteConcern(),
72 rc: db.ReadConcern(),
73 rp: db.ReadPreference(),
74 }
75
76 bo := options.MergeBucketOptions(opts...)
77 if bo.Name != nil {
78 b.name = *bo.Name
79 }
80 if bo.ChunkSizeBytes != nil {
81 b.chunkSize = *bo.ChunkSizeBytes
82 }
83 if bo.WriteConcern != nil {
84 b.wc = bo.WriteConcern
85 }
86 if bo.ReadConcern != nil {
87 b.rc = bo.ReadConcern
88 }
89 if bo.ReadPreference != nil {
90 b.rp = bo.ReadPreference
91 }
92
93 var collOpts = options.Collection().SetWriteConcern(b.wc).SetReadConcern(b.rc).SetReadPreference(b.rp)
94
95 b.chunksColl = db.Collection(b.name+".chunks", collOpts)
96 b.filesColl = db.Collection(b.name+".files", collOpts)
97 b.readBuf = make([]byte, b.chunkSize)
98 b.writeBuf = make([]byte, b.chunkSize)
99
100 return b, nil
101 }
102
103
104 func (b *Bucket) SetWriteDeadline(t time.Time) error {
105 b.writeDeadline = t
106 return nil
107 }
108
109
110 func (b *Bucket) SetReadDeadline(t time.Time) error {
111 b.readDeadline = t
112 return nil
113 }
114
115
116 func (b *Bucket) OpenUploadStream(filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
117 return b.OpenUploadStreamWithID(primitive.NewObjectID(), filename, opts...)
118 }
119
120
121 func (b *Bucket) OpenUploadStreamWithID(fileID interface{}, filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
122 ctx, cancel := deadlineContext(b.writeDeadline)
123 if cancel != nil {
124 defer cancel()
125 }
126
127 if err := b.checkFirstWrite(ctx); err != nil {
128 return nil, err
129 }
130
131 upload, err := b.parseUploadOptions(opts...)
132 if err != nil {
133 return nil, err
134 }
135
136 return newUploadStream(upload, fileID, filename, b.chunksColl, b.filesColl), nil
137 }
138
139
140
141
142
143 func (b *Bucket) UploadFromStream(filename string, source io.Reader, opts ...*options.UploadOptions) (primitive.ObjectID, error) {
144 fileID := primitive.NewObjectID()
145 err := b.UploadFromStreamWithID(fileID, filename, source, opts...)
146 return fileID, err
147 }
148
149
150
151
152
153 func (b *Bucket) UploadFromStreamWithID(fileID interface{}, filename string, source io.Reader, opts ...*options.UploadOptions) error {
154 us, err := b.OpenUploadStreamWithID(fileID, filename, opts...)
155 if err != nil {
156 return err
157 }
158
159 err = us.SetWriteDeadline(b.writeDeadline)
160 if err != nil {
161 _ = us.Close()
162 return err
163 }
164
165 for {
166 n, err := source.Read(b.readBuf)
167 if err != nil && err != io.EOF {
168 _ = us.Abort()
169 return err
170 }
171
172 if n > 0 {
173 _, err := us.Write(b.readBuf[:n])
174 if err != nil {
175 return err
176 }
177 }
178
179 if n == 0 || err == io.EOF {
180 break
181 }
182 }
183
184 return us.Close()
185 }
186
187
188 func (b *Bucket) OpenDownloadStream(fileID interface{}) (*DownloadStream, error) {
189 return b.openDownloadStream(bson.D{
190 {"_id", fileID},
191 })
192 }
193
194
195
196
197
198
199 func (b *Bucket) DownloadToStream(fileID interface{}, stream io.Writer) (int64, error) {
200 ds, err := b.OpenDownloadStream(fileID)
201 if err != nil {
202 return 0, err
203 }
204
205 return b.downloadToStream(ds, stream)
206 }
207
208
209 func (b *Bucket) OpenDownloadStreamByName(filename string, opts ...*options.NameOptions) (*DownloadStream, error) {
210 var numSkip int32 = -1
211 var sortOrder int32 = 1
212
213 nameOpts := options.MergeNameOptions(opts...)
214 if nameOpts.Revision != nil {
215 numSkip = *nameOpts.Revision
216 }
217
218 if numSkip < 0 {
219 sortOrder = -1
220 numSkip = (-1 * numSkip) - 1
221 }
222
223 findOpts := options.Find().SetSkip(int64(numSkip)).SetSort(bson.D{{"uploadDate", sortOrder}})
224
225 return b.openDownloadStream(bson.D{{"filename", filename}}, findOpts)
226 }
227
228
229
230
231
232 func (b *Bucket) DownloadToStreamByName(filename string, stream io.Writer, opts ...*options.NameOptions) (int64, error) {
233 ds, err := b.OpenDownloadStreamByName(filename, opts...)
234 if err != nil {
235 return 0, err
236 }
237
238 return b.downloadToStream(ds, stream)
239 }
240
241
242
243
244
245
246
247 func (b *Bucket) Delete(fileID interface{}) error {
248 ctx, cancel := deadlineContext(b.writeDeadline)
249 if cancel != nil {
250 defer cancel()
251 }
252 return b.DeleteContext(ctx, fileID)
253 }
254
255
256
257
258
259 func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
260
261
262
263 if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
264 newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
265
266 ctx = newCtx
267
268 defer cancelFunc()
269 }
270
271
272 res, err := b.filesColl.DeleteOne(ctx, bson.D{{"_id", fileID}})
273 if err == nil && res.DeletedCount == 0 {
274 err = ErrFileNotFound
275 }
276 if err != nil {
277 _ = b.deleteChunks(ctx, fileID)
278 return err
279 }
280
281 return b.deleteChunks(ctx, fileID)
282 }
283
284
285
286
287
288
289
290 func (b *Bucket) Find(filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
291 ctx, cancel := deadlineContext(b.readDeadline)
292 if cancel != nil {
293 defer cancel()
294 }
295
296 return b.FindContext(ctx, filter, opts...)
297 }
298
299
300
301
302
303
304 func (b *Bucket) FindContext(ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
305 gfsOpts := options.MergeGridFSFindOptions(opts...)
306 find := options.Find()
307 if gfsOpts.AllowDiskUse != nil {
308 find.SetAllowDiskUse(*gfsOpts.AllowDiskUse)
309 }
310 if gfsOpts.BatchSize != nil {
311 find.SetBatchSize(*gfsOpts.BatchSize)
312 }
313 if gfsOpts.Limit != nil {
314 find.SetLimit(int64(*gfsOpts.Limit))
315 }
316 if gfsOpts.MaxTime != nil {
317 find.SetMaxTime(*gfsOpts.MaxTime)
318 }
319 if gfsOpts.NoCursorTimeout != nil {
320 find.SetNoCursorTimeout(*gfsOpts.NoCursorTimeout)
321 }
322 if gfsOpts.Skip != nil {
323 find.SetSkip(int64(*gfsOpts.Skip))
324 }
325 if gfsOpts.Sort != nil {
326 find.SetSort(gfsOpts.Sort)
327 }
328
329 return b.filesColl.Find(ctx, filter, find)
330 }
331
332
333
334
335
336
337
338 func (b *Bucket) Rename(fileID interface{}, newFilename string) error {
339 ctx, cancel := deadlineContext(b.writeDeadline)
340 if cancel != nil {
341 defer cancel()
342 }
343
344 return b.RenameContext(ctx, fileID, newFilename)
345 }
346
347
348
349
350
351 func (b *Bucket) RenameContext(ctx context.Context, fileID interface{}, newFilename string) error {
352 res, err := b.filesColl.UpdateOne(ctx,
353 bson.D{{"_id", fileID}},
354 bson.D{{"$set", bson.D{{"filename", newFilename}}}},
355 )
356 if err != nil {
357 return err
358 }
359
360 if res.MatchedCount == 0 {
361 return ErrFileNotFound
362 }
363
364 return nil
365 }
366
367
368
369
370
371
372
373 func (b *Bucket) Drop() error {
374 ctx, cancel := deadlineContext(b.writeDeadline)
375 if cancel != nil {
376 defer cancel()
377 }
378
379 return b.DropContext(ctx)
380 }
381
382
383
384
385
386 func (b *Bucket) DropContext(ctx context.Context) error {
387
388
389
390 if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
391 newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
392
393 ctx = newCtx
394
395 defer cancelFunc()
396 }
397
398 err := b.filesColl.Drop(ctx)
399 if err != nil {
400 return err
401 }
402
403 return b.chunksColl.Drop(ctx)
404 }
405
406
407 func (b *Bucket) GetFilesCollection() *mongo.Collection {
408 return b.filesColl
409 }
410
411
412 func (b *Bucket) GetChunksCollection() *mongo.Collection {
413 return b.chunksColl
414 }
415
416 func (b *Bucket) openDownloadStream(filter interface{}, opts ...*options.FindOptions) (*DownloadStream, error) {
417 ctx, cancel := deadlineContext(b.readDeadline)
418 if cancel != nil {
419 defer cancel()
420 }
421
422 cursor, err := b.findFile(ctx, filter, opts...)
423 if err != nil {
424 return nil, err
425 }
426
427
428
429
430 var foundFile File
431 if err = cursor.Decode(&foundFile); err != nil {
432 return nil, fmt.Errorf("error decoding files collection document: %w", err)
433 }
434
435 if foundFile.Length == 0 {
436 return newDownloadStream(nil, foundFile.ChunkSize, &foundFile), nil
437 }
438
439
440 if _, err := cursor.Current.LookupErr("chunkSize"); err != nil {
441 return nil, ErrMissingChunkSize
442 }
443
444 chunksCursor, err := b.findChunks(ctx, foundFile.ID)
445 if err != nil {
446 return nil, err
447 }
448
449
450 return newDownloadStream(chunksCursor, foundFile.ChunkSize, &foundFile), nil
451 }
452
453 func deadlineContext(deadline time.Time) (context.Context, context.CancelFunc) {
454 if deadline.Equal(time.Time{}) {
455 return context.Background(), nil
456 }
457
458 return context.WithDeadline(context.Background(), deadline)
459 }
460
461 func (b *Bucket) downloadToStream(ds *DownloadStream, stream io.Writer) (int64, error) {
462 err := ds.SetReadDeadline(b.readDeadline)
463 if err != nil {
464 _ = ds.Close()
465 return 0, err
466 }
467
468 copied, err := io.Copy(stream, ds)
469 if err != nil {
470 _ = ds.Close()
471 return 0, err
472 }
473
474 return copied, ds.Close()
475 }
476
477 func (b *Bucket) deleteChunks(ctx context.Context, fileID interface{}) error {
478 _, err := b.chunksColl.DeleteMany(ctx, bson.D{{"files_id", fileID}})
479 return err
480 }
481
482 func (b *Bucket) findFile(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) {
483 cursor, err := b.filesColl.Find(ctx, filter, opts...)
484 if err != nil {
485 return nil, err
486 }
487
488 if !cursor.Next(ctx) {
489 _ = cursor.Close(ctx)
490 return nil, ErrFileNotFound
491 }
492
493 return cursor, nil
494 }
495
496 func (b *Bucket) findChunks(ctx context.Context, fileID interface{}) (*mongo.Cursor, error) {
497 chunksCursor, err := b.chunksColl.Find(ctx,
498 bson.D{{"files_id", fileID}},
499 options.Find().SetSort(bson.D{{"n", 1}}))
500 if err != nil {
501 return nil, err
502 }
503
504 return chunksCursor, nil
505 }
506
507
508 func numericalIndexDocsEqual(expected, actual bsoncore.Document) (bool, error) {
509 if bytes.Equal(expected, actual) {
510 return true, nil
511 }
512
513 actualElems, err := actual.Elements()
514 if err != nil {
515 return false, err
516 }
517 expectedElems, err := expected.Elements()
518 if err != nil {
519 return false, err
520 }
521
522 if len(actualElems) != len(expectedElems) {
523 return false, nil
524 }
525
526 for idx, expectedElem := range expectedElems {
527 actualElem := actualElems[idx]
528 if actualElem.Key() != expectedElem.Key() {
529 return false, nil
530 }
531
532 actualVal := actualElem.Value()
533 expectedVal := expectedElem.Value()
534 actualInt, actualOK := actualVal.AsInt64OK()
535 expectedInt, expectedOK := expectedVal.AsInt64OK()
536
537
538 if !actualOK || !expectedOK {
539 return false, nil
540 }
541
542 if actualInt != expectedInt {
543 return false, nil
544 }
545 }
546 return true, nil
547 }
548
549
550 func createNumericalIndexIfNotExists(ctx context.Context, iv mongo.IndexView, model mongo.IndexModel) error {
551 c, err := iv.List(ctx)
552 if err != nil {
553 return err
554 }
555 defer func() {
556 _ = c.Close(ctx)
557 }()
558
559 modelKeysBytes, err := bson.Marshal(model.Keys)
560 if err != nil {
561 return err
562 }
563 modelKeysDoc := bsoncore.Document(modelKeysBytes)
564
565 for c.Next(ctx) {
566 keyElem, err := c.Current.LookupErr("key")
567 if err != nil {
568 return err
569 }
570
571 keyElemDoc := keyElem.Document()
572
573 found, err := numericalIndexDocsEqual(modelKeysDoc, bsoncore.Document(keyElemDoc))
574 if err != nil {
575 return err
576 }
577 if found {
578 return nil
579 }
580 }
581
582 _, err = iv.CreateOne(ctx, model)
583 return err
584 }
585
586
587 func (b *Bucket) createIndexes(ctx context.Context) error {
588
589 cloned, err := b.filesColl.Clone(options.Collection().SetReadPreference(readpref.Primary()))
590 if err != nil {
591 return err
592 }
593
594 docRes := cloned.FindOne(ctx, bson.D{}, options.FindOne().SetProjection(bson.D{{"_id", 1}}))
595
596 _, err = docRes.Raw()
597 if !errors.Is(err, mongo.ErrNoDocuments) {
598
599 return err
600 }
601
602 filesIv := b.filesColl.Indexes()
603 chunksIv := b.chunksColl.Indexes()
604
605 filesModel := mongo.IndexModel{
606 Keys: bson.D{
607 {"filename", int32(1)},
608 {"uploadDate", int32(1)},
609 },
610 }
611
612 chunksModel := mongo.IndexModel{
613 Keys: bson.D{
614 {"files_id", int32(1)},
615 {"n", int32(1)},
616 },
617 Options: options.Index().SetUnique(true),
618 }
619
620 if err = createNumericalIndexIfNotExists(ctx, filesIv, filesModel); err != nil {
621 return err
622 }
623 return createNumericalIndexIfNotExists(ctx, chunksIv, chunksModel)
624 }
625
626 func (b *Bucket) checkFirstWrite(ctx context.Context) error {
627 if !b.firstWriteDone {
628
629
630
631 if err := b.createIndexes(ctx); err != nil {
632 return err
633 }
634 b.firstWriteDone = true
635 }
636
637 return nil
638 }
639
640 func (b *Bucket) parseUploadOptions(opts ...*options.UploadOptions) (*Upload, error) {
641 upload := &Upload{
642 chunkSize: b.chunkSize,
643 }
644
645 uo := options.MergeUploadOptions(opts...)
646 if uo.ChunkSizeBytes != nil {
647 upload.chunkSize = *uo.ChunkSizeBytes
648 }
649 if uo.Registry == nil {
650 uo.Registry = bson.DefaultRegistry
651 }
652 if uo.Metadata != nil {
653
654
655 raw, err := bson.MarshalWithRegistry(uo.Registry, uo.Metadata)
656 if err != nil {
657 return nil, err
658 }
659 var doc bson.D
660 unMarErr := bson.UnmarshalWithRegistry(uo.Registry, raw, &doc)
661 if unMarErr != nil {
662 return nil, unMarErr
663 }
664 upload.metadata = doc
665 }
666
667 return upload, nil
668 }
669
View as plain text