1
2
3
4
5
6
7 package mongo
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "time"
14
15 "go.mongodb.org/mongo-driver/bson"
16 "go.mongodb.org/mongo-driver/bson/bsoncodec"
17 "go.mongodb.org/mongo-driver/internal/csfle"
18 "go.mongodb.org/mongo-driver/mongo/description"
19 "go.mongodb.org/mongo-driver/mongo/options"
20 "go.mongodb.org/mongo-driver/mongo/readconcern"
21 "go.mongodb.org/mongo-driver/mongo/readpref"
22 "go.mongodb.org/mongo-driver/mongo/writeconcern"
23 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
24 "go.mongodb.org/mongo-driver/x/mongo/driver"
25 "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
26 "go.mongodb.org/mongo-driver/x/mongo/driver/session"
27 )
28
29 var (
30 defaultRunCmdOpts = []*options.RunCmdOptions{options.RunCmd().SetReadPreference(readpref.Primary())}
31 )
32
33
34 type Database struct {
35 client *Client
36 name string
37 readConcern *readconcern.ReadConcern
38 writeConcern *writeconcern.WriteConcern
39 readPreference *readpref.ReadPref
40 readSelector description.ServerSelector
41 writeSelector description.ServerSelector
42 bsonOpts *options.BSONOptions
43 registry *bsoncodec.Registry
44 }
45
46 func newDatabase(client *Client, name string, opts ...*options.DatabaseOptions) *Database {
47 dbOpt := options.MergeDatabaseOptions(opts...)
48
49 rc := client.readConcern
50 if dbOpt.ReadConcern != nil {
51 rc = dbOpt.ReadConcern
52 }
53
54 rp := client.readPreference
55 if dbOpt.ReadPreference != nil {
56 rp = dbOpt.ReadPreference
57 }
58
59 wc := client.writeConcern
60 if dbOpt.WriteConcern != nil {
61 wc = dbOpt.WriteConcern
62 }
63
64 bsonOpts := client.bsonOpts
65 if dbOpt.BSONOptions != nil {
66 bsonOpts = dbOpt.BSONOptions
67 }
68
69 reg := client.registry
70 if dbOpt.Registry != nil {
71 reg = dbOpt.Registry
72 }
73
74 db := &Database{
75 client: client,
76 name: name,
77 readPreference: rp,
78 readConcern: rc,
79 writeConcern: wc,
80 bsonOpts: bsonOpts,
81 registry: reg,
82 }
83
84 db.readSelector = description.CompositeSelector([]description.ServerSelector{
85 description.ReadPrefSelector(db.readPreference),
86 description.LatencySelector(db.client.localThreshold),
87 })
88
89 db.writeSelector = description.CompositeSelector([]description.ServerSelector{
90 description.WriteSelector(),
91 description.LatencySelector(db.client.localThreshold),
92 })
93
94 return db
95 }
96
97
98 func (db *Database) Client() *Client {
99 return db.client
100 }
101
102
103 func (db *Database) Name() string {
104 return db.name
105 }
106
107
108 func (db *Database) Collection(name string, opts ...*options.CollectionOptions) *Collection {
109 return newCollection(db, name, opts...)
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124 func (db *Database) Aggregate(ctx context.Context, pipeline interface{},
125 opts ...*options.AggregateOptions) (*Cursor, error) {
126 a := aggregateParams{
127 ctx: ctx,
128 pipeline: pipeline,
129 client: db.client,
130 registry: db.registry,
131 readConcern: db.readConcern,
132 writeConcern: db.writeConcern,
133 retryRead: db.client.retryReads,
134 db: db.name,
135 readSelector: db.readSelector,
136 writeSelector: db.writeSelector,
137 readPreference: db.readPreference,
138 opts: opts,
139 }
140 return aggregate(a)
141 }
142
143 func (db *Database) processRunCommand(ctx context.Context, cmd interface{},
144 cursorCommand bool, opts ...*options.RunCmdOptions) (*operation.Command, *session.Client, error) {
145 sess := sessionFromContext(ctx)
146 if sess == nil && db.client.sessionPool != nil {
147 sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
148 }
149
150 err := db.client.validSession(sess)
151 if err != nil {
152 return nil, sess, err
153 }
154
155 ro := options.MergeRunCmdOptions(append(defaultRunCmdOpts, opts...)...)
156 if sess != nil && sess.TransactionRunning() && ro.ReadPreference != nil && ro.ReadPreference.Mode() != readpref.PrimaryMode {
157 return nil, sess, errors.New("read preference in a transaction must be primary")
158 }
159
160 if isUnorderedMap(cmd) {
161 return nil, sess, ErrMapForOrderedArgument{"cmd"}
162 }
163
164 runCmdDoc, err := marshal(cmd, db.bsonOpts, db.registry)
165 if err != nil {
166 return nil, sess, err
167 }
168 readSelect := description.CompositeSelector([]description.ServerSelector{
169 description.ReadPrefSelector(ro.ReadPreference),
170 description.LatencySelector(db.client.localThreshold),
171 })
172 if sess != nil && sess.PinnedServer != nil {
173 readSelect = makePinnedSelector(sess, readSelect)
174 }
175
176 var op *operation.Command
177 switch cursorCommand {
178 case true:
179 cursorOpts := db.client.createBaseCursorOptions()
180
181 cursorOpts.MarshalValueEncoderFn = newEncoderFn(db.bsonOpts, db.registry)
182
183 op = operation.NewCursorCommand(runCmdDoc, cursorOpts)
184 default:
185 op = operation.NewCommand(runCmdDoc)
186 }
187
188 return op.Session(sess).CommandMonitor(db.client.monitor).
189 ServerSelector(readSelect).ClusterClock(db.client.clock).
190 Database(db.name).Deployment(db.client.deployment).
191 Crypt(db.client.cryptFLE).ReadPreference(ro.ReadPreference).ServerAPI(db.client.serverAPI).
192 Timeout(db.client.timeout).Logger(db.client.logger), sess, nil
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) *SingleResult {
214 if ctx == nil {
215 ctx = context.Background()
216 }
217
218 op, sess, err := db.processRunCommand(ctx, runCommand, false, opts...)
219 defer closeImplicitSession(sess)
220 if err != nil {
221 return &SingleResult{err: err}
222 }
223
224 err = op.Execute(ctx)
225
226 _, convErr := processWriteError(err)
227 return &SingleResult{
228 ctx: ctx,
229 err: convErr,
230 rdr: bson.Raw(op.Result()),
231 bsonOpts: db.bsonOpts,
232 reg: db.registry,
233 }
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 func (db *Database) RunCommandCursor(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) (*Cursor, error) {
251 if ctx == nil {
252 ctx = context.Background()
253 }
254
255 op, sess, err := db.processRunCommand(ctx, runCommand, true, opts...)
256 if err != nil {
257 closeImplicitSession(sess)
258 return nil, replaceErrors(err)
259 }
260
261 if err = op.Execute(ctx); err != nil {
262 closeImplicitSession(sess)
263 if errors.Is(err, driver.ErrNoCursor) {
264 return nil, errors.New(
265 "database response does not contain a cursor; try using RunCommand instead")
266 }
267 return nil, replaceErrors(err)
268 }
269
270 bc, err := op.ResultCursor()
271 if err != nil {
272 closeImplicitSession(sess)
273 return nil, replaceErrors(err)
274 }
275 cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
276 return cursor, replaceErrors(err)
277 }
278
279
280
281 func (db *Database) Drop(ctx context.Context) error {
282 if ctx == nil {
283 ctx = context.Background()
284 }
285
286 sess := sessionFromContext(ctx)
287 if sess == nil && db.client.sessionPool != nil {
288 sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
289 defer sess.EndSession()
290 }
291
292 err := db.client.validSession(sess)
293 if err != nil {
294 return err
295 }
296
297 wc := db.writeConcern
298 if sess.TransactionRunning() {
299 wc = nil
300 }
301 if !writeconcern.AckWrite(wc) {
302 sess = nil
303 }
304
305 selector := makePinnedSelector(sess, db.writeSelector)
306
307 op := operation.NewDropDatabase().
308 Session(sess).WriteConcern(wc).CommandMonitor(db.client.monitor).
309 ServerSelector(selector).ClusterClock(db.client.clock).
310 Database(db.name).Deployment(db.client.deployment).Crypt(db.client.cryptFLE).
311 ServerAPI(db.client.serverAPI)
312
313 err = op.Execute(ctx)
314
315 driverErr, ok := err.(driver.Error)
316 if err != nil && (!ok || !driverErr.NamespaceNotFound()) {
317 return replaceErrors(err)
318 }
319 return nil
320 }
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336 func (db *Database) ListCollectionSpecifications(ctx context.Context, filter interface{},
337 opts ...*options.ListCollectionsOptions) ([]*CollectionSpecification, error) {
338
339 cursor, err := db.ListCollections(ctx, filter, opts...)
340 if err != nil {
341 return nil, err
342 }
343
344 var specs []*CollectionSpecification
345 err = cursor.All(ctx, &specs)
346 if err != nil {
347 return nil, err
348 }
349
350 for _, spec := range specs {
351
352
353 if spec.IDIndex != nil && spec.IDIndex.Namespace == "" {
354 spec.IDIndex.Namespace = db.name + "." + spec.Name
355 }
356 }
357 return specs, nil
358 }
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373 func (db *Database) ListCollections(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) (*Cursor, error) {
374 if ctx == nil {
375 ctx = context.Background()
376 }
377
378 filterDoc, err := marshal(filter, db.bsonOpts, db.registry)
379 if err != nil {
380 return nil, err
381 }
382
383 sess := sessionFromContext(ctx)
384 if sess == nil && db.client.sessionPool != nil {
385 sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
386 }
387
388 err = db.client.validSession(sess)
389 if err != nil {
390 closeImplicitSession(sess)
391 return nil, err
392 }
393
394 selector := description.CompositeSelector([]description.ServerSelector{
395 description.ReadPrefSelector(readpref.Primary()),
396 description.LatencySelector(db.client.localThreshold),
397 })
398 selector = makeReadPrefSelector(sess, selector, db.client.localThreshold)
399
400 lco := options.MergeListCollectionsOptions(opts...)
401 op := operation.NewListCollections(filterDoc).
402 Session(sess).ReadPreference(db.readPreference).CommandMonitor(db.client.monitor).
403 ServerSelector(selector).ClusterClock(db.client.clock).
404 Database(db.name).Deployment(db.client.deployment).Crypt(db.client.cryptFLE).
405 ServerAPI(db.client.serverAPI).Timeout(db.client.timeout)
406
407 cursorOpts := db.client.createBaseCursorOptions()
408
409 cursorOpts.MarshalValueEncoderFn = newEncoderFn(db.bsonOpts, db.registry)
410
411 if lco.NameOnly != nil {
412 op = op.NameOnly(*lco.NameOnly)
413 }
414 if lco.BatchSize != nil {
415 cursorOpts.BatchSize = *lco.BatchSize
416 op = op.BatchSize(*lco.BatchSize)
417 }
418 if lco.AuthorizedCollections != nil {
419 op = op.AuthorizedCollections(*lco.AuthorizedCollections)
420 }
421
422 retry := driver.RetryNone
423 if db.client.retryReads {
424 retry = driver.RetryOncePerCommand
425 }
426 op = op.Retry(retry)
427
428 err = op.Execute(ctx)
429 if err != nil {
430 closeImplicitSession(sess)
431 return nil, replaceErrors(err)
432 }
433
434 bc, err := op.Result(cursorOpts)
435 if err != nil {
436 closeImplicitSession(sess)
437 return nil, replaceErrors(err)
438 }
439 cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
440 return cursor, replaceErrors(err)
441 }
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457 func (db *Database) ListCollectionNames(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) ([]string, error) {
458 opts = append(opts, options.ListCollections().SetNameOnly(true))
459
460 res, err := db.ListCollections(ctx, filter, opts...)
461 if err != nil {
462 return nil, err
463 }
464
465 defer res.Close(ctx)
466
467 names := make([]string, 0)
468 for res.Next(ctx) {
469 elem, err := res.Current.LookupErr("name")
470 if err != nil {
471 return nil, err
472 }
473
474 if elem.Type != bson.TypeString {
475 return nil, fmt.Errorf("incorrect type for 'name'. got %v. want %v", elem.Type, bson.TypeString)
476 }
477
478 elemName := elem.StringValue()
479 names = append(names, elemName)
480 }
481
482 res.Close(ctx)
483 return names, nil
484 }
485
486
487 func (db *Database) ReadConcern() *readconcern.ReadConcern {
488 return db.readConcern
489 }
490
491
492 func (db *Database) ReadPreference() *readpref.ReadPref {
493 return db.readPreference
494 }
495
496
497 func (db *Database) WriteConcern() *writeconcern.WriteConcern {
498 return db.writeConcern
499 }
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514 func (db *Database) Watch(ctx context.Context, pipeline interface{},
515 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
516
517 csConfig := changeStreamConfig{
518 readConcern: db.readConcern,
519 readPreference: db.readPreference,
520 client: db.client,
521 registry: db.registry,
522 streamType: DatabaseStream,
523 databaseName: db.Name(),
524 crypt: db.client.cryptFLE,
525 }
526 return newChangeStream(ctx, csConfig, pipeline, opts...)
527 }
528
529
530
531
532
533
534
535
536
537 func (db *Database) CreateCollection(ctx context.Context, name string, opts ...*options.CreateCollectionOptions) error {
538 cco := options.MergeCreateCollectionOptions(opts...)
539
540
541 ef := cco.EncryptedFields
542
543 if ef == nil {
544 ef = db.getEncryptedFieldsFromMap(name)
545 }
546 if ef != nil {
547 return db.createCollectionWithEncryptedFields(ctx, name, ef, opts...)
548 }
549
550 return db.createCollection(ctx, name, opts...)
551 }
552
553
554
555 func (db *Database) getEncryptedFieldsFromServer(ctx context.Context, collectionName string) (interface{}, error) {
556
557 collSpecs, err := db.ListCollectionSpecifications(ctx, bson.D{{"name", collectionName}})
558 if err != nil {
559 return nil, err
560 }
561 if len(collSpecs) == 0 {
562 return nil, nil
563 }
564 if len(collSpecs) > 1 {
565 return nil, fmt.Errorf("expected 1 or 0 results from listCollections, got %v", len(collSpecs))
566 }
567 collSpec := collSpecs[0]
568 rawValue, err := collSpec.Options.LookupErr("encryptedFields")
569 if errors.Is(err, bsoncore.ErrElementNotFound) {
570 return nil, nil
571 } else if err != nil {
572 return nil, err
573 }
574
575 encryptedFields, ok := rawValue.DocumentOK()
576 if !ok {
577 return nil, fmt.Errorf("expected encryptedFields of %v to be document, got %v", collectionName, rawValue.Type)
578 }
579
580 return encryptedFields, nil
581 }
582
583
584
585 func (db *Database) getEncryptedFieldsFromMap(collectionName string) interface{} {
586
587 efMap := db.client.encryptedFieldsMap
588 if efMap == nil {
589 return nil
590 }
591
592 namespace := db.name + "." + collectionName
593
594 ef, ok := efMap[namespace]
595 if ok {
596 return ef
597 }
598 return nil
599 }
600
601
602 func (db *Database) createCollectionWithEncryptedFields(ctx context.Context, name string, ef interface{}, opts ...*options.CreateCollectionOptions) error {
603 efBSON, err := marshal(ef, db.bsonOpts, db.registry)
604 if err != nil {
605 return fmt.Errorf("error transforming document: %w", err)
606 }
607
608
609
610
611 {
612 const QEv2WireVersion = 21
613 server, err := db.client.deployment.SelectServer(ctx, description.WriteSelector())
614 if err != nil {
615 return fmt.Errorf("error selecting server to check maxWireVersion: %w", err)
616 }
617 conn, err := server.Connection(ctx)
618 if err != nil {
619 return fmt.Errorf("error getting connection to check maxWireVersion: %w", err)
620 }
621 defer conn.Close()
622 wireVersionRange := conn.Description().WireVersion
623 if wireVersionRange.Max < QEv2WireVersion {
624 return fmt.Errorf("Driver support of Queryable Encryption is incompatible with server. Upgrade server to use Queryable Encryption. Got maxWireVersion %v but need maxWireVersion >= %v", wireVersionRange.Max, QEv2WireVersion)
625 }
626 }
627
628
629
630 stateCollectionOpts := options.CreateCollection().
631 SetClusteredIndex(bson.D{{"key", bson.D{{"_id", 1}}}, {"unique", true}})
632
633 escCollection, err := csfle.GetEncryptedStateCollectionName(efBSON, name, csfle.EncryptedStateCollection)
634 if err != nil {
635 return err
636 }
637
638 if err := db.createCollection(ctx, escCollection, stateCollectionOpts); err != nil {
639 return err
640 }
641
642
643 ecocCollection, err := csfle.GetEncryptedStateCollectionName(efBSON, name, csfle.EncryptedCompactionCollection)
644 if err != nil {
645 return err
646 }
647
648 if err := db.createCollection(ctx, ecocCollection, stateCollectionOpts); err != nil {
649 return err
650 }
651
652
653 op, err := db.createCollectionOperation(name, opts...)
654 if err != nil {
655 return err
656 }
657
658 op.EncryptedFields(efBSON)
659 if err := db.executeCreateOperation(ctx, op); err != nil {
660 return err
661 }
662
663
664 if _, err := db.Collection(name).Indexes().CreateOne(ctx, IndexModel{Keys: bson.D{{"__safeContent__", 1}}}); err != nil {
665 return fmt.Errorf("error creating safeContent index: %w", err)
666 }
667
668 return nil
669 }
670
671
672 func (db *Database) createCollection(ctx context.Context, name string, opts ...*options.CreateCollectionOptions) error {
673 op, err := db.createCollectionOperation(name, opts...)
674 if err != nil {
675 return err
676 }
677 return db.executeCreateOperation(ctx, op)
678 }
679
680 func (db *Database) createCollectionOperation(name string, opts ...*options.CreateCollectionOptions) (*operation.Create, error) {
681 cco := options.MergeCreateCollectionOptions(opts...)
682 op := operation.NewCreate(name).ServerAPI(db.client.serverAPI)
683
684 if cco.Capped != nil {
685 op.Capped(*cco.Capped)
686 }
687 if cco.Collation != nil {
688 op.Collation(bsoncore.Document(cco.Collation.ToDocument()))
689 }
690 if cco.ChangeStreamPreAndPostImages != nil {
691 csppi, err := marshal(cco.ChangeStreamPreAndPostImages, db.bsonOpts, db.registry)
692 if err != nil {
693 return nil, err
694 }
695 op.ChangeStreamPreAndPostImages(csppi)
696 }
697 if cco.DefaultIndexOptions != nil {
698 idx, doc := bsoncore.AppendDocumentStart(nil)
699 if cco.DefaultIndexOptions.StorageEngine != nil {
700 storageEngine, err := marshal(cco.DefaultIndexOptions.StorageEngine, db.bsonOpts, db.registry)
701 if err != nil {
702 return nil, err
703 }
704
705 doc = bsoncore.AppendDocumentElement(doc, "storageEngine", storageEngine)
706 }
707 doc, err := bsoncore.AppendDocumentEnd(doc, idx)
708 if err != nil {
709 return nil, err
710 }
711
712 op.IndexOptionDefaults(doc)
713 }
714 if cco.MaxDocuments != nil {
715 op.Max(*cco.MaxDocuments)
716 }
717 if cco.SizeInBytes != nil {
718 op.Size(*cco.SizeInBytes)
719 }
720 if cco.StorageEngine != nil {
721 storageEngine, err := marshal(cco.StorageEngine, db.bsonOpts, db.registry)
722 if err != nil {
723 return nil, err
724 }
725 op.StorageEngine(storageEngine)
726 }
727 if cco.ValidationAction != nil {
728 op.ValidationAction(*cco.ValidationAction)
729 }
730 if cco.ValidationLevel != nil {
731 op.ValidationLevel(*cco.ValidationLevel)
732 }
733 if cco.Validator != nil {
734 validator, err := marshal(cco.Validator, db.bsonOpts, db.registry)
735 if err != nil {
736 return nil, err
737 }
738 op.Validator(validator)
739 }
740 if cco.ExpireAfterSeconds != nil {
741 op.ExpireAfterSeconds(*cco.ExpireAfterSeconds)
742 }
743 if cco.TimeSeriesOptions != nil {
744 idx, doc := bsoncore.AppendDocumentStart(nil)
745 doc = bsoncore.AppendStringElement(doc, "timeField", cco.TimeSeriesOptions.TimeField)
746
747 if cco.TimeSeriesOptions.MetaField != nil {
748 doc = bsoncore.AppendStringElement(doc, "metaField", *cco.TimeSeriesOptions.MetaField)
749 }
750 if cco.TimeSeriesOptions.Granularity != nil {
751 doc = bsoncore.AppendStringElement(doc, "granularity", *cco.TimeSeriesOptions.Granularity)
752 }
753
754 if cco.TimeSeriesOptions.BucketMaxSpan != nil {
755 bmss := int64(*cco.TimeSeriesOptions.BucketMaxSpan / time.Second)
756
757 doc = bsoncore.AppendInt64Element(doc, "bucketMaxSpanSeconds", bmss)
758 }
759
760 if cco.TimeSeriesOptions.BucketRounding != nil {
761 brs := int64(*cco.TimeSeriesOptions.BucketRounding / time.Second)
762
763 doc = bsoncore.AppendInt64Element(doc, "bucketRoundingSeconds", brs)
764 }
765
766 doc, err := bsoncore.AppendDocumentEnd(doc, idx)
767 if err != nil {
768 return nil, err
769 }
770
771 op.TimeSeries(doc)
772 }
773 if cco.ClusteredIndex != nil {
774 clusteredIndex, err := marshal(cco.ClusteredIndex, db.bsonOpts, db.registry)
775 if err != nil {
776 return nil, err
777 }
778 op.ClusteredIndex(clusteredIndex)
779 }
780
781 return op, nil
782 }
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797 func (db *Database) CreateView(ctx context.Context, viewName, viewOn string, pipeline interface{},
798 opts ...*options.CreateViewOptions) error {
799
800 pipelineArray, _, err := marshalAggregatePipeline(pipeline, db.bsonOpts, db.registry)
801 if err != nil {
802 return err
803 }
804
805 op := operation.NewCreate(viewName).
806 ViewOn(viewOn).
807 Pipeline(pipelineArray).
808 ServerAPI(db.client.serverAPI)
809 cvo := options.MergeCreateViewOptions(opts...)
810 if cvo.Collation != nil {
811 op.Collation(bsoncore.Document(cvo.Collation.ToDocument()))
812 }
813
814 return db.executeCreateOperation(ctx, op)
815 }
816
817 func (db *Database) executeCreateOperation(ctx context.Context, op *operation.Create) error {
818 sess := sessionFromContext(ctx)
819 if sess == nil && db.client.sessionPool != nil {
820 sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
821 defer sess.EndSession()
822 }
823
824 err := db.client.validSession(sess)
825 if err != nil {
826 return err
827 }
828
829 wc := db.writeConcern
830 if sess.TransactionRunning() {
831 wc = nil
832 }
833 if !writeconcern.AckWrite(wc) {
834 sess = nil
835 }
836
837 selector := makePinnedSelector(sess, db.writeSelector)
838 op = op.Session(sess).
839 WriteConcern(wc).
840 CommandMonitor(db.client.monitor).
841 ServerSelector(selector).
842 ClusterClock(db.client.clock).
843 Database(db.name).
844 Deployment(db.client.deployment).
845 Crypt(db.client.cryptFLE)
846
847 return replaceErrors(op.Execute(ctx))
848 }
849
View as plain text