1
2
3
4
5
6
7 package unified
8
9 import (
10 "context"
11 "fmt"
12 "time"
13
14 "go.mongodb.org/mongo-driver/bson"
15 "go.mongodb.org/mongo-driver/bson/bsontype"
16 "go.mongodb.org/mongo-driver/internal/bsonutil"
17 "go.mongodb.org/mongo-driver/mongo/options"
18 )
19
20
21
22 func executeCreateView(ctx context.Context, operation *operation) (*operationResult, error) {
23 db, err := entities(ctx).database(operation.Object)
24 if err != nil {
25 return nil, err
26 }
27
28 var collName string
29 var cvo options.CreateViewOptions
30 var viewOn string
31 pipeline := make([]interface{}, 0)
32
33 elems, err := operation.Arguments.Elements()
34 if err != nil {
35 return nil, err
36 }
37
38 for _, elem := range elems {
39 key := elem.Key()
40 val := elem.Value()
41
42 switch key {
43 case "collection":
44 collName = val.StringValue()
45 case "pipeline":
46 pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
47 case "viewOn":
48 viewOn = val.StringValue()
49 default:
50 return nil, fmt.Errorf("unrecognized createView option %q", key)
51 }
52 }
53 if collName == "" {
54 return nil, newMissingArgumentError("collection")
55 }
56 if viewOn == "" {
57 return nil, newMissingArgumentError("viewOn")
58 }
59
60 err = db.CreateView(ctx, collName, viewOn, pipeline, &cvo)
61 return newErrorResult(err), nil
62 }
63
64 func executeCreateCollection(ctx context.Context, operation *operation) (*operationResult, error) {
65
66
67 createView, err := operation.isCreateView()
68 if err != nil {
69 return nil, err
70 }
71 if createView {
72 return executeCreateView(ctx, operation)
73 }
74
75 db, err := entities(ctx).database(operation.Object)
76 if err != nil {
77 return nil, err
78 }
79
80 var collName string
81 var cco options.CreateCollectionOptions
82 elems, _ := operation.Arguments.Elements()
83 for _, elem := range elems {
84 key := elem.Key()
85 val := elem.Value()
86
87 switch key {
88 case "collection":
89 collName = val.StringValue()
90 case "changeStreamPreAndPostImages":
91 cco.SetChangeStreamPreAndPostImages(val.Document())
92 case "expireAfterSeconds":
93 cco.SetExpireAfterSeconds(int64(val.Int32()))
94 case "capped":
95 cco.SetCapped(val.Boolean())
96 case "size":
97 cco.SetSizeInBytes(val.AsInt64())
98 case "max":
99 cco.SetMaxDocuments(val.AsInt64())
100 case "timeseries":
101 tsElems, err := elem.Value().Document().Elements()
102 if err != nil {
103 return nil, err
104 }
105
106 tso := options.TimeSeries()
107 for _, elem := range tsElems {
108 key := elem.Key()
109 val := elem.Value()
110
111 switch key {
112 case "timeField":
113 tso.SetTimeField(val.StringValue())
114 case "metaField":
115 tso.SetMetaField(val.StringValue())
116 case "granularity":
117 tso.SetGranularity(val.StringValue())
118 case "bucketMaxSpanSeconds":
119 tso.SetBucketMaxSpan(time.Duration(val.Int32()) * time.Second)
120 case "bucketRoundingSeconds":
121 tso.SetBucketRounding(time.Duration(val.Int32()) * time.Second)
122 default:
123 return nil, fmt.Errorf("unrecognized timeseries option %q", key)
124 }
125 }
126 cco.SetTimeSeriesOptions(tso)
127 case "clusteredIndex":
128 cco.SetClusteredIndex(val.Document())
129 default:
130 return nil, fmt.Errorf("unrecognized createCollection option %q", key)
131 }
132 }
133 if collName == "" {
134 return nil, newMissingArgumentError("collection")
135 }
136
137 err = db.CreateCollection(ctx, collName, &cco)
138 if err != nil {
139 return newErrorResult(err), nil
140 }
141
142 if collID := operation.ResultEntityID; collID != nil {
143 collEntityOpts := newCollectionEntityOptions(*collID, operation.Object, collName, nil)
144
145 err := entities(ctx).addCollectionEntity(collEntityOpts)
146 if err != nil {
147 return nil, fmt.Errorf("failed to save collection as entity: %w", err)
148 }
149 }
150
151 return newEmptyResult(), nil
152 }
153
154 func executeDropCollection(ctx context.Context, operation *operation) (*operationResult, error) {
155 db, err := entities(ctx).database(operation.Object)
156 if err != nil {
157 return nil, err
158 }
159
160 var collName string
161 elems, _ := operation.Arguments.Elements()
162 for _, elem := range elems {
163 key := elem.Key()
164 val := elem.Value()
165
166 switch key {
167 case "collection":
168 collName = val.StringValue()
169 default:
170 return nil, fmt.Errorf("unrecognized dropCollection option %q", key)
171 }
172 }
173 if collName == "" {
174 return nil, newMissingArgumentError("collection")
175 }
176
177 err = db.Collection(collName).Drop(ctx)
178 return newErrorResult(err), nil
179 }
180
181 func executeListCollections(ctx context.Context, operation *operation) (*operationResult, error) {
182 db, err := entities(ctx).database(operation.Object)
183 if err != nil {
184 return nil, err
185 }
186
187 listCollArgs, err := createListCollectionsArguments(operation.Arguments)
188 if err != nil {
189 return nil, err
190 }
191
192 cursor, err := db.ListCollections(ctx, listCollArgs.filter, listCollArgs.opts)
193 if err != nil {
194 return newErrorResult(err), nil
195 }
196 defer cursor.Close(ctx)
197
198 var docs []bson.Raw
199 if err := cursor.All(ctx, &docs); err != nil {
200 return newErrorResult(err), nil
201 }
202 return newCursorResult(docs), nil
203 }
204
205 func executeListCollectionNames(ctx context.Context, operation *operation) (*operationResult, error) {
206 db, err := entities(ctx).database(operation.Object)
207 if err != nil {
208 return nil, err
209 }
210
211 listCollArgs, err := createListCollectionsArguments(operation.Arguments)
212 if err != nil {
213 return nil, err
214 }
215
216 names, err := db.ListCollectionNames(ctx, listCollArgs.filter, listCollArgs.opts)
217 if err != nil {
218 return newErrorResult(err), nil
219 }
220 _, data, err := bson.MarshalValue(names)
221 if err != nil {
222 return nil, fmt.Errorf("error converting collection names slice to BSON: %w", err)
223 }
224 return newValueResult(bsontype.Array, data, nil), nil
225 }
226
227 func executeRunCommand(ctx context.Context, operation *operation) (*operationResult, error) {
228 db, err := entities(ctx).database(operation.Object)
229 if err != nil {
230 return nil, err
231 }
232
233 var command bson.Raw
234 opts := options.RunCmd()
235
236 elems, _ := operation.Arguments.Elements()
237 for _, elem := range elems {
238 key := elem.Key()
239 val := elem.Value()
240
241 switch key {
242 case "command":
243 command = val.Document()
244 case "commandName":
245
246
247 case "readConcern":
248
249 return nil, fmt.Errorf("readConcern in runCommand not supported")
250 case "readPreference":
251 var temp ReadPreference
252 if err := bson.Unmarshal(val.Document(), &temp); err != nil {
253 return nil, fmt.Errorf("error unmarshalling readPreference option: %w", err)
254 }
255
256 rp, err := temp.ToReadPrefOption()
257 if err != nil {
258 return nil, fmt.Errorf("error creating readpref.ReadPref object: %w", err)
259 }
260 opts.SetReadPreference(rp)
261 case "writeConcern":
262
263 return nil, fmt.Errorf("writeConcern in runCommand not supported")
264 default:
265 return nil, fmt.Errorf("unrecognized runCommand option %q", key)
266 }
267 }
268 if command == nil {
269 return nil, newMissingArgumentError("command")
270 }
271
272 res, err := db.RunCommand(ctx, command, opts).Raw()
273 return newDocumentResult(res, err), nil
274 }
275
276
277
278 func executeRunCursorCommand(ctx context.Context, operation *operation) (*operationResult, error) {
279 db, err := entities(ctx).database(operation.Object)
280 if err != nil {
281 return nil, err
282 }
283
284 var (
285 batchSize int32
286 command bson.Raw
287 comment bson.Raw
288 maxTime time.Duration
289 )
290
291 opts := options.RunCmd()
292
293 elems, _ := operation.Arguments.Elements()
294 for _, elem := range elems {
295 key := elem.Key()
296 val := elem.Value()
297
298 switch key {
299 case "batchSize":
300 batchSize = val.Int32()
301 case "command":
302 command = val.Document()
303 case "commandName":
304
305
306
307 case "comment":
308 comment = val.Document()
309 case "maxTimeMS":
310 maxTime = time.Duration(val.AsInt64()) * time.Millisecond
311 case "cursorTimeout":
312 return nil, newSkipTestError("cursorTimeout not supported")
313 case "timeoutMode":
314 return nil, newSkipTestError("timeoutMode not supported")
315 default:
316 return nil, fmt.Errorf("unrecognized runCursorCommand option: %q", key)
317 }
318 }
319
320 if command == nil {
321 return nil, newMissingArgumentError("command")
322 }
323
324 cursor, err := db.RunCommandCursor(ctx, command, opts)
325 if err != nil {
326 return newErrorResult(err), nil
327 }
328
329 if batchSize > 0 {
330 cursor.SetBatchSize(batchSize)
331 }
332
333 if maxTime > 0 {
334 cursor.SetMaxTime(maxTime)
335 }
336
337 if len(comment) > 0 {
338 cursor.SetComment(comment)
339 }
340
341
342
343
344
345 var docs []bson.Raw
346 if err := cursor.All(ctx, &docs); err != nil {
347 return newErrorResult(err), nil
348 }
349
350 return newCursorResult(docs), nil
351 }
352
353
354
355 func executeCreateRunCursorCommand(ctx context.Context, operation *operation) (*operationResult, error) {
356 db, err := entities(ctx).database(operation.Object)
357 if err != nil {
358 return nil, err
359 }
360
361 var (
362 batchSize int32
363 command bson.Raw
364 )
365
366 opts := options.RunCmd()
367
368 elems, _ := operation.Arguments.Elements()
369 for _, elem := range elems {
370 key := elem.Key()
371 val := elem.Value()
372
373 switch key {
374 case "batchSize":
375 batchSize = val.Int32()
376 case "command":
377 command = val.Document()
378 case "commandName":
379
380
381
382 case "cursorType":
383 return nil, newSkipTestError("cursorType not supported")
384 case "timeoutMode":
385 return nil, newSkipTestError("timeoutMode not supported")
386 default:
387 return nil, fmt.Errorf("unrecognized createRunCursorCommand option: %q", key)
388 }
389 }
390
391 if command == nil {
392 return nil, newMissingArgumentError("command")
393 }
394
395
396
397 cursor, err := db.RunCommandCursor(ctx, command, opts)
398 if err != nil {
399 return newErrorResult(err), nil
400 }
401
402 if batchSize > 0 {
403 cursor.SetBatchSize(batchSize)
404 }
405
406 if cursorID := operation.ResultEntityID; cursorID != nil {
407 err := entities(ctx).addCursorEntity(*cursorID, cursor)
408 if err != nil {
409 return nil, fmt.Errorf("failed to store result as cursor entity: %w", err)
410 }
411 }
412
413 return newEmptyResult(), nil
414 }
415
View as plain text