1
2
3
4
5
6
7 package unified
8
9 import (
10 "context"
11 "fmt"
12 "time"
13
14 "go.mongodb.org/mongo-driver/bson"
15 "go.mongodb.org/mongo-driver/internal/csot"
16 "go.mongodb.org/mongo-driver/mongo"
17 )
18
19 type operation struct {
20 Name string `bson:"name"`
21 Object string `bson:"object"`
22 Arguments bson.Raw `bson:"arguments"`
23 IgnoreResultAndError bool `bson:"ignoreResultAndError"`
24 ExpectedError *expectedError `bson:"expectError"`
25 ExpectedResult *bson.RawValue `bson:"expectResult"`
26 ResultEntityID *string `bson:"saveResultAsEntity"`
27 }
28
29
30
31 func (op *operation) execute(ctx context.Context, loopDone <-chan struct{}) error {
32 res, err := op.run(ctx, loopDone)
33 if err != nil {
34 return fmt.Errorf("execution failed: %v", err)
35 }
36
37 if op.IgnoreResultAndError {
38 return nil
39 }
40
41 if err := verifyOperationError(ctx, op.ExpectedError, res); err != nil {
42 return fmt.Errorf("error verification failed: %v", err)
43 }
44
45 if op.ExpectedResult != nil {
46 if err := verifyOperationResult(ctx, *op.ExpectedResult, res); err != nil {
47 return fmt.Errorf("result verification failed: %v", err)
48 }
49 }
50 return nil
51 }
52
53
54 func (op *operation) isCreateView() (bool, error) {
55 if op.Name != "createCollection" {
56 return false, nil
57 }
58
59 elements, err := op.Arguments.Elements()
60 if err != nil {
61 return false, err
62 }
63
64 var has bool
65 for _, elem := range elements {
66 if elem.Key() == "viewOn" {
67 has = true
68 break
69 }
70 }
71 return has, nil
72 }
73
74 func (op *operation) run(ctx context.Context, loopDone <-chan struct{}) (*operationResult, error) {
75 if op.Object == "testRunner" {
76
77 return newEmptyResult(), executeTestRunnerOperation(ctx, op, loopDone)
78 }
79
80
81 if id, ok := op.Arguments.Lookup("session").StringValueOK(); ok {
82 sess, err := entities(ctx).session(id)
83 if err != nil {
84 return nil, err
85 }
86 ctx = mongo.NewSessionContext(ctx, sess)
87
88
89
90 op.Arguments = removeFieldsFromDocument(op.Arguments, "session")
91 }
92
93
94 if tms, ok := op.Arguments.Lookup("timeoutMS").Int32OK(); ok {
95 timeout := time.Duration(tms) * time.Millisecond
96 newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, timeout)
97
98 ctx = newCtx
99
100 defer cancelFunc()
101
102
103
104 op.Arguments = removeFieldsFromDocument(op.Arguments, "timeoutMS")
105 }
106
107 switch op.Name {
108
109 case "abortTransaction":
110 return executeAbortTransaction(ctx, op)
111 case "commitTransaction":
112 return executeCommitTransaction(ctx, op)
113 case "endSession":
114
115 return newEmptyResult(), executeEndSession(ctx, op)
116 case "startTransaction":
117 return executeStartTransaction(ctx, op)
118 case "withTransaction":
119
120 return newEmptyResult(), executeWithTransaction(ctx, op, loopDone)
121
122
123 case "createChangeStream":
124 return executeCreateChangeStream(ctx, op)
125 case "listDatabases":
126 return executeListDatabases(ctx, op, false)
127 case "listDatabaseNames":
128 return executeListDatabases(ctx, op, true)
129
130
131 case "createCollection":
132 return executeCreateCollection(ctx, op)
133 case "dropCollection":
134 return executeDropCollection(ctx, op)
135 case "listCollections":
136 return executeListCollections(ctx, op)
137 case "listCollectionNames":
138 return executeListCollectionNames(ctx, op)
139 case "runCommand":
140 return executeRunCommand(ctx, op)
141 case "runCursorCommand":
142 return executeRunCursorCommand(ctx, op)
143 case "createCommandCursor":
144 return executeCreateRunCursorCommand(ctx, op)
145
146
147 case "aggregate":
148 return executeAggregate(ctx, op)
149 case "bulkWrite":
150 return executeBulkWrite(ctx, op)
151 case "countDocuments":
152 return executeCountDocuments(ctx, op)
153 case "createFindCursor":
154 return executeCreateFindCursor(ctx, op)
155 case "createIndex":
156 return executeCreateIndex(ctx, op)
157 case "createSearchIndex":
158 return executeCreateSearchIndex(ctx, op)
159 case "createSearchIndexes":
160 return executeCreateSearchIndexes(ctx, op)
161 case "deleteOne":
162 return executeDeleteOne(ctx, op)
163 case "deleteMany":
164 return executeDeleteMany(ctx, op)
165 case "distinct":
166 return executeDistinct(ctx, op)
167 case "dropIndex":
168 return executeDropIndex(ctx, op)
169 case "dropIndexes":
170 return executeDropIndexes(ctx, op)
171 case "dropSearchIndex":
172 return executeDropSearchIndex(ctx, op)
173 case "estimatedDocumentCount":
174 return executeEstimatedDocumentCount(ctx, op)
175 case "find":
176 return executeFind(ctx, op)
177 case "findOne":
178 return executeFindOne(ctx, op)
179 case "findOneAndDelete":
180 return executeFindOneAndDelete(ctx, op)
181 case "findOneAndReplace":
182 return executeFindOneAndReplace(ctx, op)
183 case "findOneAndUpdate":
184 return executeFindOneAndUpdate(ctx, op)
185 case "insertMany":
186 return executeInsertMany(ctx, op)
187 case "insertOne":
188 return executeInsertOne(ctx, op)
189 case "listIndexes":
190 return executeListIndexes(ctx, op)
191 case "listSearchIndexes":
192 return executeListSearchIndexes(ctx, op)
193 case "rename":
194
195 if _, err := entities(ctx).collection(op.Object); err == nil {
196 return executeRenameCollection(ctx, op)
197 }
198 if _, err := entities(ctx).gridFSBucket(op.Object); err == nil {
199 return executeBucketRename(ctx, op)
200 }
201 return nil, fmt.Errorf("failed to find a collection or GridFS bucket named %q", op.Object)
202 case "replaceOne":
203 return executeReplaceOne(ctx, op)
204 case "updateOne":
205 return executeUpdateOne(ctx, op)
206 case "updateMany":
207 return executeUpdateMany(ctx, op)
208 case "updateSearchIndex":
209 return executeUpdateSearchIndex(ctx, op)
210
211
212 case "delete":
213 return executeBucketDelete(ctx, op)
214 case "downloadByName":
215 return executeBucketDownloadByName(ctx, op)
216 case "download":
217 return executeBucketDownload(ctx, op)
218 case "drop":
219 return executeBucketDrop(ctx, op)
220 case "upload":
221 return executeBucketUpload(ctx, op)
222
223
224 case "close":
225 if cursor, err := entities(ctx).cursor(op.Object); err == nil {
226 _ = cursor.Close(ctx)
227
228 return newEmptyResult(), nil
229 }
230
231 if clientEntity, err := entities(ctx).client(op.Object); err == nil {
232 _ = clientEntity.disconnect(context.Background())
233
234 return newEmptyResult(), nil
235 }
236
237 return nil, fmt.Errorf("failed to find a cursor or client named %q", op.Object)
238 case "iterateOnce":
239 return executeIterateOnce(ctx, op)
240 case "iterateUntilDocumentOrError":
241 return executeIterateUntilDocumentOrError(ctx, op)
242
243
244 case "createDataKey":
245 return executeCreateDataKey(ctx, op)
246 case "rewrapManyDataKey":
247 return executeRewrapManyDataKey(ctx, op)
248 case "removeKeyAltName":
249 return executeRemoveKeyAltName(ctx, op)
250 case "getKeys":
251 return executeGetKeys(ctx, op)
252 case "getKeyByAltName":
253 return executeGetKeyByAltName(ctx, op)
254 case "getKey":
255 return executeGetKey(ctx, op)
256 case "deleteKey":
257 return executeDeleteKey(ctx, op)
258 case "addKeyAltName":
259 return executeAddKeyAltName(ctx, op)
260
261
262 case "count", "listIndexNames", "modifyCollection":
263 return nil, newSkipTestError(fmt.Sprintf("the %q operation is not supported", op.Name))
264 default:
265 return nil, fmt.Errorf("unrecognized entity operation %q", op.Name)
266 }
267 }
268
View as plain text