1
2
3
4
5
6
7 package unified
8
9 import (
10 "context"
11 "fmt"
12
13 "go.mongodb.org/mongo-driver/bson"
14 "go.mongodb.org/mongo-driver/mongo"
15 "go.mongodb.org/mongo-driver/mongo/options"
16 )
17
18 func executeAbortTransaction(ctx context.Context, operation *operation) (*operationResult, error) {
19 sess, err := entities(ctx).session(operation.Object)
20 if err != nil {
21 return nil, err
22 }
23
24
25 elems, _ := operation.Arguments.Elements()
26 if len(elems) > 0 {
27 return nil, fmt.Errorf("unrecognized abortTransaction options %v", operation.Arguments)
28 }
29
30 return newErrorResult(sess.AbortTransaction(ctx)), nil
31 }
32
33 func executeEndSession(ctx context.Context, operation *operation) error {
34 sess, err := entities(ctx).session(operation.Object)
35 if err != nil {
36 return err
37 }
38
39
40 elems, _ := operation.Arguments.Elements()
41 if len(elems) > 0 {
42 return fmt.Errorf("unrecognized endSession options %v", operation.Arguments)
43 }
44
45 sess.EndSession(ctx)
46 return nil
47 }
48
49 func executeCommitTransaction(ctx context.Context, operation *operation) (*operationResult, error) {
50 sess, err := entities(ctx).session(operation.Object)
51 if err != nil {
52 return nil, err
53 }
54
55
56 elems, _ := operation.Arguments.Elements()
57 if len(elems) > 0 {
58 return nil, fmt.Errorf("unrecognized commitTransaction options %v", operation.Arguments)
59 }
60
61 return newErrorResult(sess.CommitTransaction(ctx)), nil
62 }
63
64 func executeStartTransaction(ctx context.Context, operation *operation) (*operationResult, error) {
65 sess, err := entities(ctx).session(operation.Object)
66 if err != nil {
67 return nil, err
68 }
69
70 opts := options.Transaction()
71 if operation.Arguments != nil {
72 var temp transactionOptions
73 if err := bson.Unmarshal(operation.Arguments, &temp); err != nil {
74 return nil, fmt.Errorf("error unmarshalling arguments to transactionOptions: %v", err)
75 }
76
77 opts = temp.TransactionOptions
78 }
79
80 return newErrorResult(sess.StartTransaction(opts)), nil
81 }
82
83 func executeWithTransaction(ctx context.Context, op *operation, loopDone <-chan struct{}) error {
84 sess, err := entities(ctx).session(op.Object)
85 if err != nil {
86 return err
87 }
88
89
90
91 callback, err := op.Arguments.LookupErr("callback")
92 if err != nil {
93 return newMissingArgumentError("callback")
94 }
95 var operations []*operation
96 if err := callback.Unmarshal(&operations); err != nil {
97 return fmt.Errorf("error transforming callback option to slice of operations: %v", err)
98 }
99
100
101 var temp transactionOptions
102 if err := bson.Unmarshal(removeFieldsFromDocument(op.Arguments, "callback"), &temp); err != nil {
103 return fmt.Errorf("error unmarshalling arguments to transactionOptions: %v", err)
104 }
105
106 _, err = sess.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) {
107 for idx, oper := range operations {
108 if err := oper.execute(ctx, loopDone); err != nil {
109 return nil, fmt.Errorf("error executing operation %q at index %d: %v", oper.Name, idx, err)
110 }
111 }
112 return nil, nil
113 }, temp.TransactionOptions)
114 return err
115 }
116
View as plain text