1
2
3
4
5
6
7 package gridfs
8
9 import (
10 "errors"
11
12 "context"
13 "time"
14
15 "math"
16
17 "go.mongodb.org/mongo-driver/bson"
18 "go.mongodb.org/mongo-driver/bson/primitive"
19 "go.mongodb.org/mongo-driver/mongo"
20 )
21
22
23
24 const UploadBufferSize = 16 * 1024 * 1024
25
26
27 var ErrStreamClosed = errors.New("stream is closed or aborted")
28
29
30
31
32 type UploadStream struct {
33 *Upload
34 FileID interface{}
35
36 chunkIndex int
37 chunksColl *mongo.Collection
38 filename string
39 filesColl *mongo.Collection
40 closed bool
41 buffer []byte
42 bufferIndex int
43 fileLen int64
44 writeDeadline time.Time
45 }
46
47
48 func newUploadStream(upload *Upload, fileID interface{}, filename string, chunks, files *mongo.Collection) *UploadStream {
49 return &UploadStream{
50 Upload: upload,
51 FileID: fileID,
52
53 chunksColl: chunks,
54 filename: filename,
55 filesColl: files,
56 buffer: make([]byte, UploadBufferSize),
57 }
58 }
59
60
61 func (us *UploadStream) Close() error {
62 if us.closed {
63 return ErrStreamClosed
64 }
65
66 ctx, cancel := deadlineContext(us.writeDeadline)
67 if cancel != nil {
68 defer cancel()
69 }
70
71 if us.bufferIndex != 0 {
72 if err := us.uploadChunks(ctx, true); err != nil {
73 return err
74 }
75 }
76
77 if err := us.createFilesCollDoc(ctx); err != nil {
78 return err
79 }
80
81 us.closed = true
82 return nil
83 }
84
85
86 func (us *UploadStream) SetWriteDeadline(t time.Time) error {
87 if us.closed {
88 return ErrStreamClosed
89 }
90
91 us.writeDeadline = t
92 return nil
93 }
94
95
96
97 func (us *UploadStream) Write(p []byte) (int, error) {
98 if us.closed {
99 return 0, ErrStreamClosed
100 }
101
102 var ctx context.Context
103
104 ctx, cancel := deadlineContext(us.writeDeadline)
105 if cancel != nil {
106 defer cancel()
107 }
108
109 origLen := len(p)
110 for {
111 if len(p) == 0 {
112 break
113 }
114
115 n := copy(us.buffer[us.bufferIndex:], p)
116 p = p[n:]
117 us.bufferIndex += n
118
119 if us.bufferIndex == UploadBufferSize {
120 err := us.uploadChunks(ctx, false)
121 if err != nil {
122 return 0, err
123 }
124 }
125 }
126 return origLen, nil
127 }
128
129
130 func (us *UploadStream) Abort() error {
131 if us.closed {
132 return ErrStreamClosed
133 }
134
135 ctx, cancel := deadlineContext(us.writeDeadline)
136 if cancel != nil {
137 defer cancel()
138 }
139
140 _, err := us.chunksColl.DeleteMany(ctx, bson.D{{"files_id", us.FileID}})
141 if err != nil {
142 return err
143 }
144
145 us.closed = true
146 return nil
147 }
148
149
150
151
152
153 func (us *UploadStream) uploadChunks(ctx context.Context, uploadPartial bool) error {
154 chunks := float64(us.bufferIndex) / float64(us.chunkSize)
155 numChunks := int(math.Ceil(chunks))
156 if !uploadPartial {
157 numChunks = int(math.Floor(chunks))
158 }
159
160 docs := make([]interface{}, numChunks)
161
162 begChunkIndex := us.chunkIndex
163 for i := 0; i < us.bufferIndex; i += int(us.chunkSize) {
164 endIndex := i + int(us.chunkSize)
165 if us.bufferIndex-i < int(us.chunkSize) {
166
167 if !uploadPartial {
168 break
169 }
170 endIndex = us.bufferIndex
171 }
172 chunkData := us.buffer[i:endIndex]
173 docs[us.chunkIndex-begChunkIndex] = bson.D{
174 {"_id", primitive.NewObjectID()},
175 {"files_id", us.FileID},
176 {"n", int32(us.chunkIndex)},
177 {"data", primitive.Binary{Subtype: 0x00, Data: chunkData}},
178 }
179 us.chunkIndex++
180 us.fileLen += int64(len(chunkData))
181 }
182
183 _, err := us.chunksColl.InsertMany(ctx, docs)
184 if err != nil {
185 return err
186 }
187
188
189 bytesUploaded := numChunks * int(us.chunkSize)
190 if bytesUploaded != UploadBufferSize && !uploadPartial {
191 copy(us.buffer[0:], us.buffer[bytesUploaded:us.bufferIndex])
192 }
193 us.bufferIndex = UploadBufferSize - bytesUploaded
194 return nil
195 }
196
197 func (us *UploadStream) createFilesCollDoc(ctx context.Context) error {
198 doc := bson.D{
199 {"_id", us.FileID},
200 {"length", us.fileLen},
201 {"chunkSize", us.chunkSize},
202 {"uploadDate", primitive.DateTime(time.Now().UnixNano() / int64(time.Millisecond))},
203 {"filename", us.filename},
204 }
205
206 if us.metadata != nil {
207 doc = append(doc, bson.E{"metadata", us.metadata})
208 }
209
210 _, err := us.filesColl.InsertOne(ctx, doc)
211 if err != nil {
212 return err
213 }
214
215 return nil
216 }
217
View as plain text