...

Source file src/go.mongodb.org/mongo-driver/mongo/gridfs/upload_stream.go

Documentation: go.mongodb.org/mongo-driver/mongo/gridfs

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package gridfs
     8  
     9  import (
    10  	"errors"
    11  
    12  	"context"
    13  	"time"
    14  
    15  	"math"
    16  
    17  	"go.mongodb.org/mongo-driver/bson"
    18  	"go.mongodb.org/mongo-driver/bson/primitive"
    19  	"go.mongodb.org/mongo-driver/mongo"
    20  )
    21  
    22  // UploadBufferSize is the size in bytes of one stream batch. Chunks will be written to the db after the sum of chunk
    23  // lengths is equal to the batch size.
    24  const UploadBufferSize = 16 * 1024 * 1024 // 16 MiB
    25  
    26  // ErrStreamClosed is an error returned if an operation is attempted on a closed/aborted stream.
    27  var ErrStreamClosed = errors.New("stream is closed or aborted")
    28  
    29  // UploadStream is used to upload a file in chunks. This type implements the io.Writer interface and a file can be
    30  // uploaded using the Write method. After an upload is complete, the Close method must be called to write file
    31  // metadata.
    32  type UploadStream struct {
    33  	*Upload // chunk size and metadata
    34  	FileID  interface{}
    35  
    36  	chunkIndex    int
    37  	chunksColl    *mongo.Collection // collection to store file chunks
    38  	filename      string
    39  	filesColl     *mongo.Collection // collection to store file metadata
    40  	closed        bool
    41  	buffer        []byte
    42  	bufferIndex   int
    43  	fileLen       int64
    44  	writeDeadline time.Time
    45  }
    46  
    47  // NewUploadStream creates a new upload stream.
    48  func newUploadStream(upload *Upload, fileID interface{}, filename string, chunks, files *mongo.Collection) *UploadStream {
    49  	return &UploadStream{
    50  		Upload: upload,
    51  		FileID: fileID,
    52  
    53  		chunksColl: chunks,
    54  		filename:   filename,
    55  		filesColl:  files,
    56  		buffer:     make([]byte, UploadBufferSize),
    57  	}
    58  }
    59  
    60  // Close writes file metadata to the files collection and cleans up any resources associated with the UploadStream.
    61  func (us *UploadStream) Close() error {
    62  	if us.closed {
    63  		return ErrStreamClosed
    64  	}
    65  
    66  	ctx, cancel := deadlineContext(us.writeDeadline)
    67  	if cancel != nil {
    68  		defer cancel()
    69  	}
    70  
    71  	if us.bufferIndex != 0 {
    72  		if err := us.uploadChunks(ctx, true); err != nil {
    73  			return err
    74  		}
    75  	}
    76  
    77  	if err := us.createFilesCollDoc(ctx); err != nil {
    78  		return err
    79  	}
    80  
    81  	us.closed = true
    82  	return nil
    83  }
    84  
    85  // SetWriteDeadline sets the write deadline for this stream.
    86  func (us *UploadStream) SetWriteDeadline(t time.Time) error {
    87  	if us.closed {
    88  		return ErrStreamClosed
    89  	}
    90  
    91  	us.writeDeadline = t
    92  	return nil
    93  }
    94  
    95  // Write transfers the contents of a byte slice into this upload stream. If the stream's underlying buffer fills up,
    96  // the buffer will be uploaded as chunks to the server. Implements the io.Writer interface.
    97  func (us *UploadStream) Write(p []byte) (int, error) {
    98  	if us.closed {
    99  		return 0, ErrStreamClosed
   100  	}
   101  
   102  	var ctx context.Context
   103  
   104  	ctx, cancel := deadlineContext(us.writeDeadline)
   105  	if cancel != nil {
   106  		defer cancel()
   107  	}
   108  
   109  	origLen := len(p)
   110  	for {
   111  		if len(p) == 0 {
   112  			break
   113  		}
   114  
   115  		n := copy(us.buffer[us.bufferIndex:], p) // copy as much as possible
   116  		p = p[n:]
   117  		us.bufferIndex += n
   118  
   119  		if us.bufferIndex == UploadBufferSize {
   120  			err := us.uploadChunks(ctx, false)
   121  			if err != nil {
   122  				return 0, err
   123  			}
   124  		}
   125  	}
   126  	return origLen, nil
   127  }
   128  
   129  // Abort closes the stream and deletes all file chunks that have already been written.
   130  func (us *UploadStream) Abort() error {
   131  	if us.closed {
   132  		return ErrStreamClosed
   133  	}
   134  
   135  	ctx, cancel := deadlineContext(us.writeDeadline)
   136  	if cancel != nil {
   137  		defer cancel()
   138  	}
   139  
   140  	_, err := us.chunksColl.DeleteMany(ctx, bson.D{{"files_id", us.FileID}})
   141  	if err != nil {
   142  		return err
   143  	}
   144  
   145  	us.closed = true
   146  	return nil
   147  }
   148  
   149  // uploadChunks uploads the current buffer as a series of chunks to the bucket
   150  // if uploadPartial is true, any data at the end of the buffer that is smaller than a chunk will be uploaded as a partial
   151  // chunk. if it is false, the data will be moved to the front of the buffer.
   152  // uploadChunks sets us.bufferIndex to the next available index in the buffer after uploading
   153  func (us *UploadStream) uploadChunks(ctx context.Context, uploadPartial bool) error {
   154  	chunks := float64(us.bufferIndex) / float64(us.chunkSize)
   155  	numChunks := int(math.Ceil(chunks))
   156  	if !uploadPartial {
   157  		numChunks = int(math.Floor(chunks))
   158  	}
   159  
   160  	docs := make([]interface{}, numChunks)
   161  
   162  	begChunkIndex := us.chunkIndex
   163  	for i := 0; i < us.bufferIndex; i += int(us.chunkSize) {
   164  		endIndex := i + int(us.chunkSize)
   165  		if us.bufferIndex-i < int(us.chunkSize) {
   166  			// partial chunk
   167  			if !uploadPartial {
   168  				break
   169  			}
   170  			endIndex = us.bufferIndex
   171  		}
   172  		chunkData := us.buffer[i:endIndex]
   173  		docs[us.chunkIndex-begChunkIndex] = bson.D{
   174  			{"_id", primitive.NewObjectID()},
   175  			{"files_id", us.FileID},
   176  			{"n", int32(us.chunkIndex)},
   177  			{"data", primitive.Binary{Subtype: 0x00, Data: chunkData}},
   178  		}
   179  		us.chunkIndex++
   180  		us.fileLen += int64(len(chunkData))
   181  	}
   182  
   183  	_, err := us.chunksColl.InsertMany(ctx, docs)
   184  	if err != nil {
   185  		return err
   186  	}
   187  
   188  	// copy any remaining bytes to beginning of buffer and set buffer index
   189  	bytesUploaded := numChunks * int(us.chunkSize)
   190  	if bytesUploaded != UploadBufferSize && !uploadPartial {
   191  		copy(us.buffer[0:], us.buffer[bytesUploaded:us.bufferIndex])
   192  	}
   193  	us.bufferIndex = UploadBufferSize - bytesUploaded
   194  	return nil
   195  }
   196  
   197  func (us *UploadStream) createFilesCollDoc(ctx context.Context) error {
   198  	doc := bson.D{
   199  		{"_id", us.FileID},
   200  		{"length", us.fileLen},
   201  		{"chunkSize", us.chunkSize},
   202  		{"uploadDate", primitive.DateTime(time.Now().UnixNano() / int64(time.Millisecond))},
   203  		{"filename", us.filename},
   204  	}
   205  
   206  	if us.metadata != nil {
   207  		doc = append(doc, bson.E{"metadata", us.metadata})
   208  	}
   209  
   210  	_, err := us.filesColl.InsertOne(ctx, doc)
   211  	if err != nil {
   212  		return err
   213  	}
   214  
   215  	return nil
   216  }
   217  

View as plain text