1
2
3
4
5
6
7 package unified
8
9 import (
10 "bytes"
11 "context"
12 "encoding/hex"
13 "fmt"
14 "io"
15 "time"
16
17 "go.mongodb.org/mongo-driver/bson"
18 "go.mongodb.org/mongo-driver/bson/bsontype"
19 "go.mongodb.org/mongo-driver/mongo/options"
20 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
21 )
22
23 func createBucketFindCursor(ctx context.Context, operation *operation) (*cursorResult, error) {
24 bucket, err := entities(ctx).gridFSBucket(operation.Object)
25 if err != nil {
26 return nil, err
27 }
28
29 var filter bson.Raw
30 opts := options.GridFSFind()
31
32 elems, err := operation.Arguments.Elements()
33 if err != nil {
34 return nil, err
35 }
36 for _, elem := range elems {
37 key := elem.Key()
38 val := elem.Value()
39
40 switch key {
41 case "maxTimeMS":
42 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
43 case "filter":
44 filter = val.Document()
45 default:
46 return nil, fmt.Errorf("unrecognized bucket find option %q", key)
47 }
48 }
49 if filter == nil {
50 return nil, newMissingArgumentError("filter")
51 }
52
53 cursor, err := bucket.FindContext(ctx, filter, opts)
54 res := &cursorResult{
55 cursor: cursor,
56 err: err,
57 }
58 return res, nil
59 }
60
61 func executeBucketDelete(ctx context.Context, operation *operation) (*operationResult, error) {
62 bucket, err := entities(ctx).gridFSBucket(operation.Object)
63 if err != nil {
64 return nil, err
65 }
66
67 var id *bson.RawValue
68
69 elems, err := operation.Arguments.Elements()
70 if err != nil {
71 return nil, err
72 }
73 for _, elem := range elems {
74 key := elem.Key()
75 val := elem.Value()
76
77 switch key {
78 case "id":
79 id = &val
80 default:
81 return nil, fmt.Errorf("unrecognized bucket delete option %q", key)
82 }
83 }
84 if id == nil {
85 return nil, newMissingArgumentError("id")
86 }
87
88 return newErrorResult(bucket.DeleteContext(ctx, *id)), nil
89 }
90
91 func executeBucketDownload(ctx context.Context, operation *operation) (*operationResult, error) {
92 bucket, err := entities(ctx).gridFSBucket(operation.Object)
93 if err != nil {
94 return nil, err
95 }
96
97 var id *bson.RawValue
98 elems, err := operation.Arguments.Elements()
99 if err != nil {
100 return nil, err
101 }
102 for _, elem := range elems {
103 key := elem.Key()
104 val := elem.Value()
105
106 switch key {
107 case "id":
108 id = &val
109 default:
110 return nil, fmt.Errorf("unrecognized bucket download option %q", key)
111 }
112 }
113 if id == nil {
114 return nil, newMissingArgumentError("id")
115 }
116
117 stream, err := bucket.OpenDownloadStream(*id)
118 if err != nil {
119 return newErrorResult(err), nil
120 }
121
122 var buffer bytes.Buffer
123 if _, err := io.Copy(&buffer, stream); err != nil {
124 return newErrorResult(err), nil
125 }
126
127 return newValueResult(bsontype.Binary, bsoncore.AppendBinary(nil, 0, buffer.Bytes()), nil), nil
128 }
129
130 func executeBucketDownloadByName(ctx context.Context, operation *operation) (*operationResult, error) {
131 bucket, err := entities(ctx).gridFSBucket(operation.Object)
132 if err != nil {
133 return nil, err
134 }
135
136 elems, err := operation.Arguments.Elements()
137 if err != nil {
138 return nil, err
139 }
140
141 var filename string
142 opts := options.GridFSName()
143 for _, elem := range elems {
144 key := elem.Key()
145 val := elem.Value()
146
147 switch key {
148 case "filename":
149 filename = val.StringValue()
150 case "revision":
151 opts.SetRevision(val.AsInt32())
152 default:
153 return nil, fmt.Errorf("unrecognized bucket download option %q", key)
154 }
155 }
156 if filename == "" {
157 return nil, newMissingArgumentError("filename")
158 }
159
160 var buf bytes.Buffer
161 _, err = bucket.DownloadToStreamByName(filename, &buf, opts)
162 if err != nil {
163 return newErrorResult(err), nil
164 }
165
166 return newValueResult(bsontype.Binary, bsoncore.AppendBinary(nil, 0, buf.Bytes()), nil), nil
167 }
168
169 func executeBucketDrop(ctx context.Context, operation *operation) (*operationResult, error) {
170 bucket, err := entities(ctx).gridFSBucket(operation.Object)
171 if err != nil {
172 return nil, err
173 }
174
175 return newErrorResult(bucket.DropContext(ctx)), nil
176 }
177
178 func executeBucketRename(ctx context.Context, operation *operation) (*operationResult, error) {
179 bucket, err := entities(ctx).gridFSBucket(operation.Object)
180 if err != nil {
181 return nil, err
182 }
183
184 var id *bson.RawValue
185 var newFilename string
186 elems, err := operation.Arguments.Elements()
187 if err != nil {
188 return nil, err
189 }
190 for _, elem := range elems {
191 key := elem.Key()
192 val := elem.Value()
193
194 switch key {
195 case "id":
196 id = &val
197 case "newFilename":
198 newFilename = val.StringValue()
199 default:
200 return nil, fmt.Errorf("unrecognized bucket rename option %q", key)
201 }
202 }
203 if id == nil {
204 return nil, newMissingArgumentError("id")
205 }
206
207 return newErrorResult(bucket.RenameContext(ctx, id, newFilename)), nil
208 }
209
210 func executeBucketUpload(ctx context.Context, operation *operation) (*operationResult, error) {
211 bucket, err := entities(ctx).gridFSBucket(operation.Object)
212 if err != nil {
213 return nil, err
214 }
215
216 var filename string
217 var fileBytes []byte
218 opts := options.GridFSUpload()
219
220 elems, err := operation.Arguments.Elements()
221 if err != nil {
222 return nil, err
223 }
224 for _, elem := range elems {
225 key := elem.Key()
226 val := elem.Value()
227
228 switch key {
229 case "chunkSizeBytes":
230 opts.SetChunkSizeBytes(val.Int32())
231 case "filename":
232 filename = val.StringValue()
233 case "metadata":
234 opts.SetMetadata(val.Document())
235 case "source":
236 fileBytes, err = hex.DecodeString(val.Document().Lookup("$$hexBytes").StringValue())
237 if err != nil {
238 return nil, fmt.Errorf("error converting source string to bytes: %w", err)
239 }
240 case "contentType":
241 return nil, newSkipTestError("the deprecated contentType file option is not supported")
242 case "disableMD5":
243 return nil, newSkipTestError("the deprecated disableMD5 file option is not supported")
244 default:
245 return nil, fmt.Errorf("unrecognized bucket upload option %q", key)
246 }
247 }
248 if filename == "" {
249 return nil, newMissingArgumentError("filename")
250 }
251 if fileBytes == nil {
252 return nil, newMissingArgumentError("source")
253 }
254
255 fileID, err := bucket.UploadFromStream(filename, bytes.NewReader(fileBytes), opts)
256 if err != nil {
257 return newErrorResult(err), nil
258 }
259
260 if operation.ResultEntityID != nil {
261 fileIDValue := bson.RawValue{
262 Type: bsontype.ObjectID,
263 Value: fileID[:],
264 }
265 if err := entities(ctx).addBSONEntity(*operation.ResultEntityID, fileIDValue); err != nil {
266 return nil, fmt.Errorf("error storing result as BSON entity: %w", err)
267 }
268 }
269
270 return newValueResult(bsontype.ObjectID, fileID[:], nil), nil
271 }
272
View as plain text