1
2
3
4
5
6
7 package unified
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "time"
14
15 "go.mongodb.org/mongo-driver/bson"
16 "go.mongodb.org/mongo-driver/bson/bsontype"
17 "go.mongodb.org/mongo-driver/internal/bsonutil"
18 "go.mongodb.org/mongo-driver/mongo"
19 "go.mongodb.org/mongo-driver/mongo/options"
20 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
21 )
22
23
24
25 func executeAggregate(ctx context.Context, operation *operation) (*operationResult, error) {
26 var aggregator interface {
27 Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (*mongo.Cursor, error)
28 }
29 var err error
30
31 aggregator, err = entities(ctx).collection(operation.Object)
32 if err != nil {
33 aggregator, err = entities(ctx).database(operation.Object)
34 }
35 if err != nil {
36 return nil, fmt.Errorf("no database or collection entity found with ID %q", operation.Object)
37 }
38
39 var pipeline []interface{}
40 opts := options.Aggregate()
41
42 elems, err := operation.Arguments.Elements()
43 if err != nil {
44 return nil, err
45 }
46 for _, elem := range elems {
47 key := elem.Key()
48 val := elem.Value()
49
50 switch key {
51 case "allowDiskUse":
52 opts.SetAllowDiskUse(val.Boolean())
53 case "batchSize":
54 opts.SetBatchSize(val.Int32())
55 case "bypassDocumentValidation":
56 opts.SetBypassDocumentValidation(val.Boolean())
57 case "collation":
58 collation, err := createCollation(val.Document())
59 if err != nil {
60 return nil, fmt.Errorf("error creating collation: %w", err)
61 }
62 opts.SetCollation(collation)
63 case "comment":
64
65
66 commentString, err := createCommentString(val)
67 if err != nil {
68 return nil, fmt.Errorf("error creating comment: %w", err)
69 }
70 opts.SetComment(commentString)
71 case "hint":
72 hint, err := createHint(val)
73 if err != nil {
74 return nil, fmt.Errorf("error creating hint: %w", err)
75 }
76 opts.SetHint(hint)
77 case "maxTimeMS":
78 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
79 case "maxAwaitTimeMS":
80 opts.SetMaxAwaitTime(time.Duration(val.Int32()) * time.Millisecond)
81 case "pipeline":
82 pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
83 case "let":
84 opts.SetLet(val.Document())
85 default:
86 return nil, fmt.Errorf("unrecognized aggregate option %q", key)
87 }
88 }
89 if pipeline == nil {
90 return nil, newMissingArgumentError("pipeline")
91 }
92
93 cursor, err := aggregator.Aggregate(ctx, pipeline, opts)
94 if err != nil {
95 return newErrorResult(err), nil
96 }
97 defer cursor.Close(ctx)
98
99 var docs []bson.Raw
100 if err := cursor.All(ctx, &docs); err != nil {
101 return newErrorResult(err), nil
102 }
103 return newCursorResult(docs), nil
104 }
105
106 func executeBulkWrite(ctx context.Context, operation *operation) (*operationResult, error) {
107 coll, err := entities(ctx).collection(operation.Object)
108 if err != nil {
109 return nil, err
110 }
111
112 var models []mongo.WriteModel
113 opts := options.BulkWrite()
114
115 elems, err := operation.Arguments.Elements()
116 if err != nil {
117 return nil, err
118 }
119 for _, elem := range elems {
120 key := elem.Key()
121 val := elem.Value()
122
123 switch key {
124 case "comment":
125 opts.SetComment(val)
126 case "ordered":
127 opts.SetOrdered(val.Boolean())
128 case "requests":
129 models, err = createBulkWriteModels(val.Array())
130 if err != nil {
131 return nil, fmt.Errorf("error creating write models: %w", err)
132 }
133 case "let":
134 opts.SetLet(val.Document())
135 default:
136 return nil, fmt.Errorf("unrecognized bulkWrite option %q", key)
137 }
138 }
139 if models == nil {
140 return nil, newMissingArgumentError("requests")
141 }
142
143 res, err := coll.BulkWrite(ctx, models, opts)
144 raw := emptyCoreDocument
145 if res != nil {
146 rawUpsertedIDs := emptyDocument
147 var marshalErr error
148 if res.UpsertedIDs != nil {
149 rawUpsertedIDs, marshalErr = bson.Marshal(res.UpsertedIDs)
150 if marshalErr != nil {
151 return nil, fmt.Errorf("error marshalling UpsertedIDs map to BSON: %w", marshalErr)
152 }
153 }
154
155 raw = bsoncore.NewDocumentBuilder().
156 AppendInt64("insertedCount", res.InsertedCount).
157 AppendInt64("deletedCount", res.DeletedCount).
158 AppendInt64("matchedCount", res.MatchedCount).
159 AppendInt64("modifiedCount", res.ModifiedCount).
160 AppendInt64("upsertedCount", res.UpsertedCount).
161 AppendDocument("upsertedIds", rawUpsertedIDs).
162 Build()
163 }
164 return newDocumentResult(raw, err), nil
165 }
166
167 func executeCountDocuments(ctx context.Context, operation *operation) (*operationResult, error) {
168 coll, err := entities(ctx).collection(operation.Object)
169 if err != nil {
170 return nil, err
171 }
172
173 var filter bson.Raw
174 opts := options.Count()
175
176 elems, err := operation.Arguments.Elements()
177 if err != nil {
178 return nil, err
179 }
180 for _, elem := range elems {
181 key := elem.Key()
182 val := elem.Value()
183
184 switch key {
185 case "collation":
186 collation, err := createCollation(val.Document())
187 if err != nil {
188 return nil, fmt.Errorf("error creating collation: %w", err)
189 }
190 opts.SetCollation(collation)
191 case "comment":
192
193
194 commentString, err := createCommentString(val)
195 if err != nil {
196 return nil, fmt.Errorf("error creating comment: %w", err)
197 }
198 opts.SetComment(commentString)
199 case "filter":
200 filter = val.Document()
201 case "hint":
202 hint, err := createHint(val)
203 if err != nil {
204 return nil, fmt.Errorf("error creating hint: %w", err)
205 }
206 opts.SetHint(hint)
207 case "limit":
208 opts.SetLimit(val.Int64())
209 case "maxTimeMS":
210 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
211 case "skip":
212 opts.SetSkip(int64(val.Int32()))
213 default:
214 return nil, fmt.Errorf("unrecognized countDocuments option %q", key)
215 }
216 }
217 if filter == nil {
218 return nil, newMissingArgumentError("filter")
219 }
220
221 count, err := coll.CountDocuments(ctx, filter, opts)
222 if err != nil {
223 return newErrorResult(err), nil
224 }
225 return newValueResult(bsontype.Int64, bsoncore.AppendInt64(nil, count), nil), nil
226 }
227
228 func executeCreateIndex(ctx context.Context, operation *operation) (*operationResult, error) {
229 coll, err := entities(ctx).collection(operation.Object)
230 if err != nil {
231 return nil, err
232 }
233
234 var keys bson.Raw
235 indexOpts := options.Index()
236
237 elems, err := operation.Arguments.Elements()
238 if err != nil {
239 return nil, err
240 }
241 for _, elem := range elems {
242 key := elem.Key()
243 val := elem.Value()
244
245 switch key {
246 case "2dsphereIndexVersion":
247 indexOpts.SetSphereVersion(val.Int32())
248 case "background":
249 indexOpts.SetBackground(val.Boolean())
250 case "bits":
251 indexOpts.SetBits(val.Int32())
252 case "bucketSize":
253 indexOpts.SetBucketSize(val.Int32())
254 case "collation":
255 collation, err := createCollation(val.Document())
256 if err != nil {
257 return nil, fmt.Errorf("error creating collation: %w", err)
258 }
259 indexOpts.SetCollation(collation)
260 case "defaultLanguage":
261 indexOpts.SetDefaultLanguage(val.StringValue())
262 case "expireAfterSeconds":
263 indexOpts.SetExpireAfterSeconds(val.Int32())
264 case "hidden":
265 indexOpts.SetHidden(val.Boolean())
266 case "keys":
267 keys = val.Document()
268 case "languageOverride":
269 indexOpts.SetLanguageOverride(val.StringValue())
270 case "max":
271 indexOpts.SetMax(val.Double())
272 case "min":
273 indexOpts.SetMin(val.Double())
274 case "name":
275 indexOpts.SetName(val.StringValue())
276 case "partialFilterExpression":
277 indexOpts.SetPartialFilterExpression(val.Document())
278 case "sparse":
279 indexOpts.SetSparse(val.Boolean())
280 case "storageEngine":
281 indexOpts.SetStorageEngine(val.Document())
282 case "unique":
283 indexOpts.SetUnique(val.Boolean())
284 case "version":
285 indexOpts.SetVersion(val.Int32())
286 case "textIndexVersion":
287 indexOpts.SetTextVersion(val.Int32())
288 case "weights":
289 indexOpts.SetWeights(val.Document())
290 case "wildcardProjection":
291 indexOpts.SetWildcardProjection(val.Document())
292 default:
293 return nil, fmt.Errorf("unrecognized createIndex option %q", key)
294 }
295 }
296 if keys == nil {
297 return nil, newMissingArgumentError("keys")
298 }
299
300 model := mongo.IndexModel{
301 Keys: keys,
302 Options: indexOpts,
303 }
304 name, err := coll.Indexes().CreateOne(ctx, model)
305 return newValueResult(bsontype.String, bsoncore.AppendString(nil, name), err), nil
306 }
307
308 func executeCreateSearchIndex(ctx context.Context, operation *operation) (*operationResult, error) {
309 coll, err := entities(ctx).collection(operation.Object)
310 if err != nil {
311 return nil, err
312 }
313
314 var model mongo.SearchIndexModel
315
316 elems, err := operation.Arguments.Elements()
317 if err != nil {
318 return nil, err
319 }
320 for _, elem := range elems {
321 key := elem.Key()
322 val := elem.Value()
323
324 switch key {
325 case "model":
326 var m struct {
327 Definition interface{}
328 Name *string
329 }
330 err = bson.Unmarshal(val.Document(), &m)
331 if err != nil {
332 return nil, err
333 }
334 model.Definition = m.Definition
335 model.Options = options.SearchIndexes()
336 model.Options.Name = m.Name
337 default:
338 return nil, fmt.Errorf("unrecognized createSearchIndex option %q", key)
339 }
340 }
341
342 name, err := coll.SearchIndexes().CreateOne(ctx, model)
343 return newValueResult(bsontype.String, bsoncore.AppendString(nil, name), err), nil
344 }
345
346 func executeCreateSearchIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
347 coll, err := entities(ctx).collection(operation.Object)
348 if err != nil {
349 return nil, err
350 }
351
352 var models []mongo.SearchIndexModel
353
354 elems, err := operation.Arguments.Elements()
355 if err != nil {
356 return nil, err
357 }
358 for _, elem := range elems {
359 key := elem.Key()
360 val := elem.Value()
361
362 switch key {
363 case "models":
364 vals, err := val.Array().Values()
365 if err != nil {
366 return nil, err
367 }
368 for _, val := range vals {
369 var m struct {
370 Definition interface{}
371 Name *string
372 }
373 err = bson.Unmarshal(val.Value, &m)
374 if err != nil {
375 return nil, err
376 }
377 model := mongo.SearchIndexModel{
378 Definition: m.Definition,
379 Options: options.SearchIndexes(),
380 }
381 model.Options.Name = m.Name
382 models = append(models, model)
383 }
384 default:
385 return nil, fmt.Errorf("unrecognized createSearchIndexes option %q", key)
386 }
387 }
388
389 names, err := coll.SearchIndexes().CreateMany(ctx, models)
390 builder := bsoncore.NewArrayBuilder()
391 for _, name := range names {
392 builder.AppendString(name)
393 }
394 return newValueResult(bsontype.Array, builder.Build(), err), nil
395 }
396
397 func executeDeleteOne(ctx context.Context, operation *operation) (*operationResult, error) {
398 coll, err := entities(ctx).collection(operation.Object)
399 if err != nil {
400 return nil, err
401 }
402
403 var filter bson.Raw
404 opts := options.Delete()
405
406 elems, err := operation.Arguments.Elements()
407 if err != nil {
408 return nil, err
409 }
410 for _, elem := range elems {
411 key := elem.Key()
412 val := elem.Value()
413
414 switch key {
415 case "collation":
416 collation, err := createCollation(val.Document())
417 if err != nil {
418 return nil, fmt.Errorf("error creating collation: %w", err)
419 }
420 opts.SetCollation(collation)
421 case "comment":
422 opts.SetComment(val)
423 case "filter":
424 filter = val.Document()
425 case "hint":
426 hint, err := createHint(val)
427 if err != nil {
428 return nil, fmt.Errorf("error creating hint: %w", err)
429 }
430 opts.SetHint(hint)
431 case "let":
432 opts.SetLet(val.Document())
433 default:
434 return nil, fmt.Errorf("unrecognized deleteOne option %q", key)
435 }
436 }
437 if filter == nil {
438 return nil, newMissingArgumentError("filter")
439 }
440
441 res, err := coll.DeleteOne(ctx, filter, opts)
442 raw := emptyCoreDocument
443 if res != nil {
444 raw = bsoncore.NewDocumentBuilder().
445 AppendInt64("deletedCount", res.DeletedCount).
446 Build()
447 }
448 return newDocumentResult(raw, err), nil
449 }
450
451 func executeDeleteMany(ctx context.Context, operation *operation) (*operationResult, error) {
452 coll, err := entities(ctx).collection(operation.Object)
453 if err != nil {
454 return nil, err
455 }
456
457 var filter bson.Raw
458 opts := options.Delete()
459
460 elems, err := operation.Arguments.Elements()
461 if err != nil {
462 return nil, err
463 }
464 for _, elem := range elems {
465 key := elem.Key()
466 val := elem.Value()
467
468 switch key {
469 case "comment":
470 opts.SetComment(val)
471 case "collation":
472 collation, err := createCollation(val.Document())
473 if err != nil {
474 return nil, fmt.Errorf("error creating collation: %w", err)
475 }
476 opts.SetCollation(collation)
477 case "filter":
478 filter = val.Document()
479 case "hint":
480 hint, err := createHint(val)
481 if err != nil {
482 return nil, fmt.Errorf("error creating hint: %w", err)
483 }
484 opts.SetHint(hint)
485 case "let":
486 opts.SetLet(val.Document())
487 default:
488 return nil, fmt.Errorf("unrecognized deleteMany option %q", key)
489 }
490 }
491 if filter == nil {
492 return nil, newMissingArgumentError("filter")
493 }
494
495 res, err := coll.DeleteMany(ctx, filter, opts)
496 raw := emptyCoreDocument
497 if res != nil {
498 raw = bsoncore.NewDocumentBuilder().
499 AppendInt64("deletedCount", res.DeletedCount).
500 Build()
501 }
502 return newDocumentResult(raw, err), nil
503 }
504
505 func executeDistinct(ctx context.Context, operation *operation) (*operationResult, error) {
506 coll, err := entities(ctx).collection(operation.Object)
507 if err != nil {
508 return nil, err
509 }
510
511 var fieldName string
512 var filter bson.Raw
513 opts := options.Distinct()
514
515 elems, err := operation.Arguments.Elements()
516 if err != nil {
517 return nil, err
518 }
519 for _, elem := range elems {
520 key := elem.Key()
521 val := elem.Value()
522
523 switch key {
524 case "collation":
525 collation, err := createCollation(val.Document())
526 if err != nil {
527 return nil, fmt.Errorf("error creating collation: %w", err)
528 }
529 opts.SetCollation(collation)
530 case "comment":
531 opts.SetComment(val)
532 case "fieldName":
533 fieldName = val.StringValue()
534 case "filter":
535 filter = val.Document()
536 case "maxTimeMS":
537 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
538 default:
539 return nil, fmt.Errorf("unrecognized distinct option %q", key)
540 }
541 }
542 if fieldName == "" {
543 return nil, newMissingArgumentError("fieldName")
544 }
545 if filter == nil {
546 return nil, newMissingArgumentError("filter")
547 }
548
549 res, err := coll.Distinct(ctx, fieldName, filter, opts)
550 if err != nil {
551 return newErrorResult(err), nil
552 }
553 _, rawRes, err := bson.MarshalValue(res)
554 if err != nil {
555 return nil, fmt.Errorf("error converting Distinct result to raw BSON: %w", err)
556 }
557 return newValueResult(bsontype.Array, rawRes, nil), nil
558 }
559
560 func executeDropIndex(ctx context.Context, operation *operation) (*operationResult, error) {
561 coll, err := entities(ctx).collection(operation.Object)
562 if err != nil {
563 return nil, err
564 }
565
566 var name string
567 dropIndexOpts := options.DropIndexes()
568
569 elems, _ := operation.Arguments.Elements()
570 for _, elem := range elems {
571 key := elem.Key()
572 val := elem.Value()
573
574 switch key {
575 case "name":
576 name = val.StringValue()
577 case "maxTimeMS":
578 dropIndexOpts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
579 default:
580 return nil, fmt.Errorf("unrecognized dropIndex option %q", key)
581 }
582 }
583 if name == "" {
584 return nil, newMissingArgumentError("name")
585 }
586
587 res, err := coll.Indexes().DropOne(ctx, name, dropIndexOpts)
588 return newDocumentResult(res, err), nil
589 }
590
591 func executeDropIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
592 coll, err := entities(ctx).collection(operation.Object)
593 if err != nil {
594 return nil, err
595 }
596
597 dropIndexOpts := options.DropIndexes()
598 elems, _ := operation.Arguments.Elements()
599 for _, elem := range elems {
600 key := elem.Key()
601 val := elem.Value()
602
603 switch key {
604 case "maxTimeMS":
605 dropIndexOpts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
606 default:
607 return nil, fmt.Errorf("unrecognized dropIndexes option %q", key)
608 }
609 }
610
611 res, err := coll.Indexes().DropAll(ctx, dropIndexOpts)
612 return newDocumentResult(res, err), nil
613 }
614
615 func executeDropSearchIndex(ctx context.Context, operation *operation) (*operationResult, error) {
616 coll, err := entities(ctx).collection(operation.Object)
617 if err != nil {
618 return nil, err
619 }
620
621 var name string
622
623 elems, err := operation.Arguments.Elements()
624 if err != nil {
625 return nil, err
626 }
627 for _, elem := range elems {
628 key := elem.Key()
629 val := elem.Value()
630
631 switch key {
632 case "name":
633 name = val.StringValue()
634 default:
635 return nil, fmt.Errorf("unrecognized dropSearchIndex option %q", key)
636 }
637 }
638
639 err = coll.SearchIndexes().DropOne(ctx, name)
640 return newValueResult(bsontype.Null, nil, err), nil
641 }
642
643 func executeEstimatedDocumentCount(ctx context.Context, operation *operation) (*operationResult, error) {
644 coll, err := entities(ctx).collection(operation.Object)
645 if err != nil {
646 return nil, err
647 }
648
649 opts := options.EstimatedDocumentCount()
650 var elems []bson.RawElement
651
652 if operation.Arguments != nil {
653 elems, err = operation.Arguments.Elements()
654 if err != nil {
655 return nil, err
656 }
657 }
658 for _, elem := range elems {
659 key := elem.Key()
660 val := elem.Value()
661
662 switch key {
663 case "comment":
664 opts.SetComment(val)
665 case "maxTimeMS":
666 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
667 default:
668 return nil, fmt.Errorf("unrecognized estimatedDocumentCount option %q", key)
669 }
670 }
671
672 count, err := coll.EstimatedDocumentCount(ctx, opts)
673 if err != nil {
674 return newErrorResult(err), nil
675 }
676 return newValueResult(bsontype.Int64, bsoncore.AppendInt64(nil, count), nil), nil
677 }
678
679 func executeCreateFindCursor(ctx context.Context, operation *operation) (*operationResult, error) {
680 result, err := createFindCursor(ctx, operation)
681 if err != nil {
682 return nil, err
683 }
684 if result.err != nil {
685 return newErrorResult(result.err), nil
686 }
687
688 if operation.ResultEntityID == nil {
689 return nil, fmt.Errorf("no entity name provided to store executeCreateFindCursor result")
690 }
691 if err := entities(ctx).addCursorEntity(*operation.ResultEntityID, result.cursor); err != nil {
692 return nil, fmt.Errorf("error storing result as cursor entity: %w", err)
693 }
694 return newEmptyResult(), nil
695 }
696
697 func executeFind(ctx context.Context, operation *operation) (*operationResult, error) {
698 result, err := createFindCursor(ctx, operation)
699 if err != nil {
700 return nil, err
701 }
702 if result.err != nil {
703 return newErrorResult(result.err), nil
704 }
705
706 var docs []bson.Raw
707 if err := result.cursor.All(ctx, &docs); err != nil {
708 return newErrorResult(err), nil
709 }
710 return newCursorResult(docs), nil
711 }
712
713 func executeFindOne(ctx context.Context, operation *operation) (*operationResult, error) {
714 coll, err := entities(ctx).collection(operation.Object)
715 if err != nil {
716 return nil, err
717 }
718
719 var filter bson.Raw
720 opts := options.FindOne()
721
722 elems, _ := operation.Arguments.Elements()
723 for _, elem := range elems {
724 key := elem.Key()
725 val := elem.Value()
726
727 switch key {
728 case "collation":
729 collation, err := createCollation(val.Document())
730 if err != nil {
731 return nil, fmt.Errorf("error creating collation: %w", err)
732 }
733 opts.SetCollation(collation)
734 case "filter":
735 filter = val.Document()
736 case "hint":
737 hint, err := createHint(val)
738 if err != nil {
739 return nil, fmt.Errorf("error creating hint: %w", err)
740 }
741 opts.SetHint(hint)
742 case "maxTimeMS":
743 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
744 case "projection":
745 opts.SetProjection(val.Document())
746 case "sort":
747 opts.SetSort(val.Document())
748 default:
749 return nil, fmt.Errorf("unrecognized findOne option %q", key)
750 }
751 }
752 if filter == nil {
753 return nil, newMissingArgumentError("filter")
754 }
755
756 res, err := coll.FindOne(ctx, filter, opts).Raw()
757
758
759
760 if errors.Is(err, mongo.ErrNoDocuments) {
761 err = nil
762 }
763
764 return newDocumentResult(res, err), nil
765 }
766
767 func executeFindOneAndDelete(ctx context.Context, operation *operation) (*operationResult, error) {
768 coll, err := entities(ctx).collection(operation.Object)
769 if err != nil {
770 return nil, err
771 }
772
773 var filter bson.Raw
774 opts := options.FindOneAndDelete()
775
776 elems, err := operation.Arguments.Elements()
777 if err != nil {
778 return nil, err
779 }
780 for _, elem := range elems {
781 key := elem.Key()
782 val := elem.Value()
783
784 switch key {
785 case "collation":
786 collation, err := createCollation(val.Document())
787 if err != nil {
788 return nil, fmt.Errorf("error creating collation: %w", err)
789 }
790 opts.SetCollation(collation)
791 case "comment":
792 opts.SetComment(val)
793 case "filter":
794 filter = val.Document()
795 case "hint":
796 hint, err := createHint(val)
797 if err != nil {
798 return nil, fmt.Errorf("error creating hint: %w", err)
799 }
800 opts.SetHint(hint)
801 case "maxTimeMS":
802 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
803 case "projection":
804 opts.SetProjection(val.Document())
805 case "sort":
806 opts.SetSort(val.Document())
807 case "let":
808 opts.SetLet(val.Document())
809 default:
810 return nil, fmt.Errorf("unrecognized findOneAndDelete option %q", key)
811 }
812 }
813 if filter == nil {
814 return nil, newMissingArgumentError("filter")
815 }
816
817 res, err := coll.FindOneAndDelete(ctx, filter, opts).Raw()
818
819
820
821 if errors.Is(err, mongo.ErrNoDocuments) {
822 err = nil
823 }
824
825 return newDocumentResult(res, err), nil
826 }
827
828 func executeFindOneAndReplace(ctx context.Context, operation *operation) (*operationResult, error) {
829 coll, err := entities(ctx).collection(operation.Object)
830 if err != nil {
831 return nil, err
832 }
833
834 var filter bson.Raw
835 var replacement bson.Raw
836 opts := options.FindOneAndReplace()
837
838 elems, err := operation.Arguments.Elements()
839 if err != nil {
840 return nil, err
841 }
842 for _, elem := range elems {
843 key := elem.Key()
844 val := elem.Value()
845
846 switch key {
847 case "bypassDocumentValidation":
848 opts.SetBypassDocumentValidation(val.Boolean())
849 case "collation":
850 collation, err := createCollation(val.Document())
851 if err != nil {
852 return nil, fmt.Errorf("error creating collation: %w", err)
853 }
854 opts.SetCollation(collation)
855 case "comment":
856 opts.SetComment(val)
857 case "filter":
858 filter = val.Document()
859 case "hint":
860 hint, err := createHint(val)
861 if err != nil {
862 return nil, fmt.Errorf("error creating hint: %w", err)
863 }
864 opts.SetHint(hint)
865 case "let":
866 opts.SetLet(val.Document())
867 case "maxTimeMS":
868 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
869 case "projection":
870 opts.SetProjection(val.Document())
871 case "replacement":
872 replacement = val.Document()
873 case "returnDocument":
874 switch rd := val.StringValue(); rd {
875 case "After":
876 opts.SetReturnDocument(options.After)
877 case "Before":
878 opts.SetReturnDocument(options.Before)
879 default:
880 return nil, fmt.Errorf("unrecognized returnDocument value %q", rd)
881 }
882 case "sort":
883 opts.SetSort(val.Document())
884 case "upsert":
885 opts.SetUpsert(val.Boolean())
886 default:
887 return nil, fmt.Errorf("unrecognized findOneAndReplace option %q", key)
888 }
889 }
890 if filter == nil {
891 return nil, newMissingArgumentError("filter")
892 }
893 if replacement == nil {
894 return nil, newMissingArgumentError("replacement")
895 }
896
897 res, err := coll.FindOneAndReplace(ctx, filter, replacement, opts).Raw()
898
899
900
901 if errors.Is(err, mongo.ErrNoDocuments) {
902 err = nil
903 }
904
905 return newDocumentResult(res, err), nil
906 }
907
908 func executeFindOneAndUpdate(ctx context.Context, operation *operation) (*operationResult, error) {
909 coll, err := entities(ctx).collection(operation.Object)
910 if err != nil {
911 return nil, err
912 }
913
914 var filter bson.Raw
915 var update interface{}
916 opts := options.FindOneAndUpdate()
917
918 elems, err := operation.Arguments.Elements()
919 if err != nil {
920 return nil, err
921 }
922 for _, elem := range elems {
923 key := elem.Key()
924 val := elem.Value()
925
926 switch key {
927 case "arrayFilters":
928 opts.SetArrayFilters(options.ArrayFilters{
929 Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...),
930 })
931 case "bypassDocumentValidation":
932 opts.SetBypassDocumentValidation(val.Boolean())
933 case "collation":
934 collation, err := createCollation(val.Document())
935 if err != nil {
936 return nil, fmt.Errorf("error creating collation: %w", err)
937 }
938 opts.SetCollation(collation)
939 case "comment":
940 opts.SetComment(val)
941 case "filter":
942 filter = val.Document()
943 case "hint":
944 hint, err := createHint(val)
945 if err != nil {
946 return nil, fmt.Errorf("error creating hint: %w", err)
947 }
948 opts.SetHint(hint)
949 case "let":
950 opts.SetLet(val.Document())
951 case "maxTimeMS":
952 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
953 case "projection":
954 opts.SetProjection(val.Document())
955 case "returnDocument":
956 switch rd := val.StringValue(); rd {
957 case "After":
958 opts.SetReturnDocument(options.After)
959 case "Before":
960 opts.SetReturnDocument(options.Before)
961 default:
962 return nil, fmt.Errorf("unrecognized returnDocument value %q", rd)
963 }
964 case "sort":
965 opts.SetSort(val.Document())
966 case "update":
967 update, err = createUpdateValue(val)
968 if err != nil {
969 return nil, fmt.Errorf("error processing update value: %q", err)
970 }
971 case "upsert":
972 opts.SetUpsert(val.Boolean())
973 default:
974 return nil, fmt.Errorf("unrecognized findOneAndUpdate option %q", key)
975 }
976 }
977 if filter == nil {
978 return nil, newMissingArgumentError("filter")
979 }
980 if update == nil {
981 return nil, newMissingArgumentError("update")
982 }
983
984 res, err := coll.FindOneAndUpdate(ctx, filter, update, opts).Raw()
985
986
987
988 if errors.Is(err, mongo.ErrNoDocuments) {
989 err = nil
990 }
991
992 return newDocumentResult(res, err), nil
993 }
994
995 func executeInsertMany(ctx context.Context, operation *operation) (*operationResult, error) {
996 coll, err := entities(ctx).collection(operation.Object)
997 if err != nil {
998 return nil, err
999 }
1000
1001 var documents []interface{}
1002 opts := options.InsertMany()
1003
1004 elems, err := operation.Arguments.Elements()
1005 if err != nil {
1006 return nil, err
1007 }
1008 for _, elem := range elems {
1009 key := elem.Key()
1010 val := elem.Value()
1011
1012 switch key {
1013 case "comment":
1014 opts.SetComment(val)
1015 case "documents":
1016 documents = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
1017 case "ordered":
1018 opts.SetOrdered(val.Boolean())
1019 default:
1020 return nil, fmt.Errorf("unrecognized insertMany option %q", key)
1021 }
1022 }
1023 if documents == nil {
1024 return nil, newMissingArgumentError("documents")
1025 }
1026
1027 res, err := coll.InsertMany(ctx, documents, opts)
1028 raw := emptyCoreDocument
1029 if res != nil {
1030
1031
1032
1033 raw = bsoncore.NewDocumentBuilder().
1034 AppendInt32("insertedCount", int32(len(res.InsertedIDs))).
1035 AppendInt32("deletedCount", 0).
1036 AppendInt32("matchedCount", 0).
1037 AppendInt32("modifiedCount", 0).
1038 AppendInt32("upsertedCount", 0).
1039 AppendDocument("upsertedIds", bsoncore.NewDocumentBuilder().Build()).
1040 Build()
1041 }
1042 return newDocumentResult(raw, err), nil
1043 }
1044
1045 func executeInsertOne(ctx context.Context, operation *operation) (*operationResult, error) {
1046 coll, err := entities(ctx).collection(operation.Object)
1047 if err != nil {
1048 return nil, err
1049 }
1050
1051 var document bson.Raw
1052 opts := options.InsertOne()
1053
1054 elems, err := operation.Arguments.Elements()
1055 if err != nil {
1056 return nil, err
1057 }
1058 for _, elem := range elems {
1059 key := elem.Key()
1060 val := elem.Value()
1061
1062 switch key {
1063 case "document":
1064 document = val.Document()
1065 case "bypassDocumentValidation":
1066 opts.SetBypassDocumentValidation(val.Boolean())
1067 case "comment":
1068 opts.SetComment(val)
1069 default:
1070 return nil, fmt.Errorf("unrecognized insertOne option %q", key)
1071 }
1072 }
1073 if document == nil {
1074 return nil, newMissingArgumentError("documents")
1075 }
1076
1077 res, err := coll.InsertOne(ctx, document, opts)
1078 raw := emptyCoreDocument
1079 if res != nil {
1080 t, data, err := bson.MarshalValue(res.InsertedID)
1081 if err != nil {
1082 return nil, fmt.Errorf("error converting InsertedID field to BSON: %w", err)
1083 }
1084 raw = bsoncore.NewDocumentBuilder().
1085 AppendValue("insertedId", bsoncore.Value{Type: t, Data: data}).
1086 Build()
1087 }
1088 return newDocumentResult(raw, err), nil
1089 }
1090
1091 func executeListIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
1092 coll, err := entities(ctx).collection(operation.Object)
1093 if err != nil {
1094 return nil, err
1095 }
1096
1097 var elems []bson.RawElement
1098
1099 if operation.Arguments != nil {
1100 elems, err = operation.Arguments.Elements()
1101 if err != nil {
1102 return nil, err
1103 }
1104 }
1105 opts := options.ListIndexes()
1106 for _, elem := range elems {
1107 key := elem.Key()
1108 val := elem.Value()
1109
1110 switch key {
1111 case "batchSize":
1112 opts.SetBatchSize(val.Int32())
1113 default:
1114 return nil, fmt.Errorf("unrecognized listIndexes option: %q", key)
1115 }
1116 }
1117
1118 cursor, err := coll.Indexes().List(ctx, opts)
1119 if err != nil {
1120 return newErrorResult(err), nil
1121 }
1122
1123 var docs []bson.Raw
1124 if err := cursor.All(ctx, &docs); err != nil {
1125 return newErrorResult(err), nil
1126 }
1127 return newCursorResult(docs), nil
1128 }
1129
1130 func executeListSearchIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
1131 coll, err := entities(ctx).collection(operation.Object)
1132 if err != nil {
1133 return nil, err
1134 }
1135
1136 searchIdxOpts := options.SearchIndexes()
1137 var opts []*options.ListSearchIndexesOptions
1138
1139 elems, err := operation.Arguments.Elements()
1140 if err != nil {
1141 return nil, err
1142 }
1143 for _, elem := range elems {
1144 key := elem.Key()
1145 val := elem.Value()
1146
1147 switch key {
1148 case "name":
1149 searchIdxOpts.SetName(val.StringValue())
1150 case "aggregationOptions":
1151 var opt options.AggregateOptions
1152 err = bson.Unmarshal(val.Document(), &opt)
1153 if err != nil {
1154 return nil, err
1155 }
1156 opts = append(opts, &options.ListSearchIndexesOptions{
1157 AggregateOpts: &opt,
1158 })
1159 default:
1160 return nil, fmt.Errorf("unrecognized listSearchIndexes option %q", key)
1161 }
1162 }
1163
1164 _, err = coll.SearchIndexes().List(ctx, searchIdxOpts, opts...)
1165 return newValueResult(bsontype.Null, nil, err), nil
1166 }
1167
1168 func executeRenameCollection(ctx context.Context, operation *operation) (*operationResult, error) {
1169 coll, err := entities(ctx).collection(operation.Object)
1170 if err != nil {
1171 return nil, err
1172 }
1173
1174 var toName string
1175 var dropTarget bool
1176 elems, err := operation.Arguments.Elements()
1177 if err != nil {
1178 return nil, err
1179 }
1180 for _, elem := range elems {
1181 key := elem.Key()
1182 val := elem.Value()
1183
1184 switch key {
1185 case "dropTarget":
1186 dropTarget = val.Boolean()
1187 case "to":
1188 toName = val.StringValue()
1189 default:
1190 return nil, fmt.Errorf("unrecognized rename option %q", key)
1191 }
1192 }
1193 if toName == "" {
1194 return nil, newMissingArgumentError("to")
1195 }
1196
1197 renameCmd := bson.D{
1198 {"renameCollection", coll.Database().Name() + "." + coll.Name()},
1199 {"to", coll.Database().Name() + "." + toName},
1200 }
1201 if dropTarget {
1202 renameCmd = append(renameCmd, bson.E{"dropTarget", dropTarget})
1203 }
1204
1205 admin := coll.Database().Client().Database("admin")
1206 res, err := admin.RunCommand(context.Background(), renameCmd).Raw()
1207 return newDocumentResult(res, err), nil
1208 }
1209
1210 func executeReplaceOne(ctx context.Context, operation *operation) (*operationResult, error) {
1211 coll, err := entities(ctx).collection(operation.Object)
1212 if err != nil {
1213 return nil, err
1214 }
1215
1216 filter := emptyDocument
1217 replacement := emptyDocument
1218 opts := options.Replace()
1219
1220 elems, err := operation.Arguments.Elements()
1221 if err != nil {
1222 return nil, err
1223 }
1224 for _, elem := range elems {
1225 key := elem.Key()
1226 val := elem.Value()
1227
1228 switch key {
1229 case "bypassDocumentValidation":
1230 opts.SetBypassDocumentValidation(val.Boolean())
1231 case "collation":
1232 collation, err := createCollation(val.Document())
1233 if err != nil {
1234 return nil, fmt.Errorf("error creating collation: %w", err)
1235 }
1236 opts.SetCollation(collation)
1237 case "comment":
1238 opts.SetComment(val)
1239 case "filter":
1240 filter = val.Document()
1241 case "hint":
1242 hint, err := createHint(val)
1243 if err != nil {
1244 return nil, fmt.Errorf("error creating hint: %w", err)
1245 }
1246 opts.SetHint(hint)
1247 case "replacement":
1248 replacement = val.Document()
1249 case "upsert":
1250 opts.SetUpsert(val.Boolean())
1251 case "let":
1252 opts.SetLet(val.Document())
1253 default:
1254 return nil, fmt.Errorf("unrecognized replaceOne option %q", key)
1255 }
1256 }
1257
1258 res, err := coll.ReplaceOne(ctx, filter, replacement, opts)
1259 raw, buildErr := buildUpdateResultDocument(res)
1260 if buildErr != nil {
1261 return nil, buildErr
1262 }
1263 return newDocumentResult(raw, err), nil
1264 }
1265
1266 func executeUpdateOne(ctx context.Context, operation *operation) (*operationResult, error) {
1267 coll, err := entities(ctx).collection(operation.Object)
1268 if err != nil {
1269 return nil, err
1270 }
1271
1272 updateArgs, err := createUpdateArguments(operation.Arguments)
1273 if err != nil {
1274 return nil, err
1275 }
1276
1277 res, err := coll.UpdateOne(ctx, updateArgs.filter, updateArgs.update, updateArgs.opts)
1278 raw, buildErr := buildUpdateResultDocument(res)
1279 if buildErr != nil {
1280 return nil, buildErr
1281 }
1282 return newDocumentResult(raw, err), nil
1283 }
1284
1285 func executeUpdateMany(ctx context.Context, operation *operation) (*operationResult, error) {
1286 coll, err := entities(ctx).collection(operation.Object)
1287 if err != nil {
1288 return nil, err
1289 }
1290
1291 updateArgs, err := createUpdateArguments(operation.Arguments)
1292 if err != nil {
1293 return nil, err
1294 }
1295
1296 res, err := coll.UpdateMany(ctx, updateArgs.filter, updateArgs.update, updateArgs.opts)
1297 raw, buildErr := buildUpdateResultDocument(res)
1298 if buildErr != nil {
1299 return nil, buildErr
1300 }
1301 return newDocumentResult(raw, err), nil
1302 }
1303
1304 func executeUpdateSearchIndex(ctx context.Context, operation *operation) (*operationResult, error) {
1305 coll, err := entities(ctx).collection(operation.Object)
1306 if err != nil {
1307 return nil, err
1308 }
1309
1310 var name string
1311 var definition interface{}
1312
1313 elems, err := operation.Arguments.Elements()
1314 if err != nil {
1315 return nil, err
1316 }
1317 for _, elem := range elems {
1318 key := elem.Key()
1319 val := elem.Value()
1320
1321 switch key {
1322 case "name":
1323 name = val.StringValue()
1324 case "definition":
1325 err = bson.Unmarshal(val.Value, &definition)
1326 if err != nil {
1327 return nil, err
1328 }
1329 default:
1330 return nil, fmt.Errorf("unrecognized updateSearchIndex option %q", key)
1331 }
1332 }
1333
1334 err = coll.SearchIndexes().UpdateOne(ctx, name, definition)
1335 return newValueResult(bsontype.Null, nil, err), nil
1336 }
1337
1338 func buildUpdateResultDocument(res *mongo.UpdateResult) (bsoncore.Document, error) {
1339 if res == nil {
1340 return emptyCoreDocument, nil
1341 }
1342
1343 builder := bsoncore.NewDocumentBuilder().
1344 AppendInt64("matchedCount", res.MatchedCount).
1345 AppendInt64("modifiedCount", res.ModifiedCount).
1346 AppendInt64("upsertedCount", res.UpsertedCount)
1347
1348 if res.UpsertedID != nil {
1349 t, data, err := bson.MarshalValue(res.UpsertedID)
1350 if err != nil {
1351 return nil, fmt.Errorf("error converting UpsertedID to BSON: %w", err)
1352 }
1353 builder.AppendValue("upsertedId", bsoncore.Value{Type: t, Data: data})
1354 }
1355 return builder.Build(), nil
1356 }
1357
1358 type cursorResult struct {
1359 cursor *mongo.Cursor
1360 err error
1361 }
1362
1363 func createFindCursor(ctx context.Context, operation *operation) (*cursorResult, error) {
1364 coll, err := entities(ctx).collection(operation.Object)
1365
1366
1367 if err != nil {
1368 if _, bucketOk := entities(ctx).gridFSBucket(operation.Object); bucketOk == nil {
1369 return createBucketFindCursor(ctx, operation)
1370 }
1371 return nil, err
1372 }
1373
1374 var filter bson.Raw
1375 opts := options.Find()
1376
1377 elems, err := operation.Arguments.Elements()
1378 if err != nil {
1379 return nil, err
1380 }
1381 for _, elem := range elems {
1382 key := elem.Key()
1383 val := elem.Value()
1384
1385 switch key {
1386 case "allowDiskUse":
1387 opts.SetAllowDiskUse(val.Boolean())
1388 case "allowPartialResults":
1389 opts.SetAllowPartialResults(val.Boolean())
1390 case "batchSize":
1391 opts.SetBatchSize(val.Int32())
1392 case "collation":
1393 collation, err := createCollation(val.Document())
1394 if err != nil {
1395 return nil, fmt.Errorf("error creating collation: %w", err)
1396 }
1397 opts.SetCollation(collation)
1398 case "comment":
1399
1400
1401 commentString, err := createCommentString(val)
1402 if err != nil {
1403 return nil, fmt.Errorf("error creating comment: %w", err)
1404 }
1405 opts.SetComment(commentString)
1406 case "filter":
1407 filter = val.Document()
1408 case "hint":
1409 hint, err := createHint(val)
1410 if err != nil {
1411 return nil, fmt.Errorf("error creating hint: %w", err)
1412 }
1413 opts.SetHint(hint)
1414 case "let":
1415 opts.SetLet(val.Document())
1416 case "limit":
1417 opts.SetLimit(int64(val.Int32()))
1418 case "max":
1419 opts.SetMax(val.Document())
1420 case "maxTimeMS":
1421 opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
1422 case "min":
1423 opts.SetMin(val.Document())
1424 case "noCursorTimeout":
1425 opts.SetNoCursorTimeout(val.Boolean())
1426 case "oplogReplay":
1427 opts.SetOplogReplay(val.Boolean())
1428 case "projection":
1429 opts.SetProjection(val.Document())
1430 case "returnKey":
1431 opts.SetReturnKey(val.Boolean())
1432 case "showRecordId":
1433 opts.SetShowRecordID(val.Boolean())
1434 case "skip":
1435 opts.SetSkip(int64(val.Int32()))
1436 case "snapshot":
1437 opts.SetSnapshot(val.Boolean())
1438 case "sort":
1439 opts.SetSort(val.Document())
1440 default:
1441 return nil, fmt.Errorf("unrecognized find option %q", key)
1442 }
1443 }
1444 if filter == nil {
1445 return nil, newMissingArgumentError("filter")
1446 }
1447
1448 cursor, err := coll.Find(ctx, filter, opts)
1449 res := &cursorResult{
1450 cursor: cursor,
1451 err: err,
1452 }
1453 return res, nil
1454 }
1455
View as plain text