1
2
3
4
5
6
7 package unified
8
9 import (
10 "context"
11 "fmt"
12 "io/ioutil"
13 "path"
14 "strings"
15 "testing"
16 "time"
17
18 "go.mongodb.org/mongo-driver/bson"
19 "go.mongodb.org/mongo-driver/internal/assert"
20 "go.mongodb.org/mongo-driver/internal/spectest"
21 "go.mongodb.org/mongo-driver/mongo"
22 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
23 )
24
25 var (
26 skippedTestDescriptions = map[string]string{
27
28
29 "A successful find event with a getmore and the server kills the cursor (<= 4.4)": "See GODRIVER-1773",
30
31
32
33 "Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
34 "Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
35 "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
36
37
38 "Find operation with snapshot": "Test fails frequently. See GODRIVER-2843",
39 "Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
40
41
42
43 "dropSearchIndex ignores read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
44 "listSearchIndexes ignores read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
45 "updateSearchIndex ignores the read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
46
47
48 "unpin after TransientTransactionError error on commit": "Implement GODRIVER-3137",
49
50
51 "unpin on successful abort": "Implement GODRIVER-3034",
52 "unpin after non-transient error on abort": "Implement GODRIVER-3034",
53 "unpin after TransientTransactionError error on abort": "Implement GODRIVER-3034",
54 "unpin when a new transaction is started": "Implement GODRIVER-3034",
55 "unpin when a non-transaction write operation uses a session": "Implement GODRIVER-3034",
56 "unpin when a non-transaction read operation uses a session": "Implement GODRIVER-3034",
57
58
59
60
61
62 "timeoutMS can be overridden for a find": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
63 "timeoutMS can be configured for an operation - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
64 "timeoutMS can be configured for an operation - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
65 "timeoutMS can be configured for an operation - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
66 "operation is retried multiple times for non-zero timeoutMS - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
67 "operation is retried multiple times for non-zero timeoutMS - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
68 "operation is retried multiple times for non-zero timeoutMS - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
69 }
70
71 logMessageValidatorTimeout = 10 * time.Millisecond
72 lowHeartbeatFrequency = 50 * time.Millisecond
73 )
74
75
76 type TestCase struct {
77 Description string `bson:"description"`
78 RunOnRequirements []mtest.RunOnBlock `bson:"runOnRequirements"`
79 SkipReason *string `bson:"skipReason"`
80 Operations []*operation `bson:"operations"`
81 ExpectedEvents []*expectedEvents `bson:"expectEvents"`
82 ExpectLogMessages []*clientLogMessages `bson:"expectLogMessages"`
83 Outcome []*collectionData `bson:"outcome"`
84
85 initialData []*collectionData
86 createEntities []map[string]*entityOptions
87 killAllSessions bool
88 schemaVersion string
89
90 entities *EntityMap
91 loopDone chan struct{}
92 }
93
94 func (tc *TestCase) performsDistinct() bool {
95 return tc.performsOperation("distinct")
96 }
97
98 func (tc *TestCase) setsFailPoint() bool {
99 return tc.performsOperation("failPoint")
100 }
101
102 func (tc *TestCase) startsTransaction() bool {
103 return tc.performsOperation("startTransaction")
104 }
105
106 func (tc *TestCase) performsOperation(name string) bool {
107 for _, op := range tc.Operations {
108 if op.Name == name {
109 return true
110 }
111 }
112 return false
113 }
114
115
116 type TestFile struct {
117 Description string `bson:"description"`
118 SchemaVersion string `bson:"schemaVersion"`
119 RunOnRequirements []mtest.RunOnBlock `bson:"runOnRequirements"`
120 CreateEntities []map[string]*entityOptions `bson:"createEntities"`
121 InitialData []*collectionData `bson:"initialData"`
122 TestCases []*TestCase `bson:"tests"`
123 }
124
125
126
127 func runTestDirectory(t *testing.T, directoryPath string, expectValidFail bool) {
128 for _, filename := range spectest.FindJSONFilesInDir(t, directoryPath) {
129 t.Run(filename, func(t *testing.T) {
130 runTestFile(t, path.Join(directoryPath, filename), expectValidFail)
131 })
132 }
133 }
134
135
136 func runTestFile(t *testing.T, filepath string, expectValidFail bool, opts ...*Options) {
137 content, err := ioutil.ReadFile(filepath)
138 assert.Nil(t, err, "ReadFile error for file %q: %v", filepath, err)
139
140 fileReqs, testCases := ParseTestFile(t, content, expectValidFail, opts...)
141
142 mtOpts := mtest.NewOptions().
143 RunOn(fileReqs...).
144 CreateClient(false)
145 mt := mtest.New(t, mtOpts)
146
147 for _, testCase := range testCases {
148 mtOpts := mtest.NewOptions().
149 RunOn(testCase.RunOnRequirements...).
150 CreateClient(false)
151
152 mt.RunOpts(testCase.Description, mtOpts, func(mt *mtest.T) {
153 defer func() {
154
155 if r := recover(); r != nil {
156 if !expectValidFail {
157 mt.Fatal(r)
158 }
159 }
160 }()
161 err := testCase.Run(mt)
162 if expectValidFail {
163 if err != nil {
164 return
165 }
166 mt.Fatalf("expected test to error, got nil")
167 }
168 if err != nil {
169 mt.Fatal(err)
170 }
171 })
172 }
173 }
174
175 func parseTestFile(testJSON []byte, opts ...*Options) ([]mtest.RunOnBlock, []*TestCase, error) {
176 var testFile TestFile
177 if err := bson.UnmarshalExtJSON(testJSON, false, &testFile); err != nil {
178 return nil, nil, err
179 }
180
181 op := MergeOptions(opts...)
182 for _, testCase := range testFile.TestCases {
183 testCase.initialData = testFile.InitialData
184 testCase.createEntities = testFile.CreateEntities
185 testCase.schemaVersion = testFile.SchemaVersion
186 testCase.entities = newEntityMap()
187 testCase.loopDone = make(chan struct{})
188 testCase.killAllSessions = *op.RunKillAllSessions
189 }
190
191 return testFile.RunOnRequirements, testFile.TestCases, nil
192 }
193
194
195 func ParseTestFile(t *testing.T, testJSON []byte, expectValidFail bool, opts ...*Options) ([]mtest.RunOnBlock, []*TestCase) {
196 t.Helper()
197
198 runOnRequirements, testCases, err := parseTestFile(testJSON, opts...)
199
200 if !expectValidFail {
201 assert.NoError(t, err, "error parsing test file")
202 }
203
204 return runOnRequirements, testCases
205 }
206
207
208
209 func (tc *TestCase) GetEntities() *EntityMap {
210 return tc.entities
211 }
212
213
214
215 func (tc *TestCase) EndLoop() {
216 tc.loopDone <- struct{}{}
217 }
218
219
220 type LoggerSkipper interface {
221 Log(args ...interface{})
222 Logf(format string, args ...interface{})
223 Skip(args ...interface{})
224 Skipf(format string, args ...interface{})
225 }
226
227
228
229 type skipTestError struct {
230 reason string
231 }
232
233
234 func (s skipTestError) Error() string {
235 return fmt.Sprintf("test must be skipped: %q", s.reason)
236 }
237
238 func newSkipTestError(reason string) error {
239 return &skipTestError{reason}
240 }
241
242 func isSkipTestError(err error) bool {
243 return err != nil && strings.Contains(err.Error(), "test must be skipped")
244 }
245
246
247 func (tc *TestCase) Run(ls LoggerSkipper) error {
248 if tc.SkipReason != nil {
249 ls.Skipf("skipping for reason: %q", *tc.SkipReason)
250 }
251 if skipReason, ok := skippedTestDescriptions[tc.Description]; ok {
252 ls.Skipf("skipping due to known failure: %v", skipReason)
253 }
254
255
256 if err := checkSchemaVersion(tc.schemaVersion); err != nil {
257 return fmt.Errorf("schema version %q not supported: %v", tc.schemaVersion, err)
258 }
259
260 testCtx := newTestContext(context.Background(), tc.entities, tc.ExpectLogMessages, tc.setsFailPoint())
261
262 defer func() {
263
264
265
266 for _, err := range disableUntargetedFailPoints(testCtx) {
267 ls.Log(err)
268 }
269 for _, err := range disableTargetedFailPoints(testCtx) {
270 ls.Log(err)
271 }
272 for _, err := range entities(testCtx).close(testCtx) {
273 ls.Log(err)
274 }
275
276
277
278 if tc.startsTransaction() && tc.killAllSessions {
279 if err := terminateOpenSessions(context.Background()); err != nil {
280 ls.Logf("error terminating open transactions after failed test: %v", err)
281 }
282 }
283
284 close(tc.loopDone)
285 }()
286
287
288 for _, collData := range tc.initialData {
289 if err := collData.createCollection(testCtx); err != nil {
290 return fmt.Errorf("error setting up collection %q: %v", collData.namespace(), err)
291 }
292 }
293
294
295
296
297
298 for idx, entity := range tc.createEntities {
299 for entityType, entityOptions := range entity {
300 if entityType == "client" && hasOperationalFailpoint(testCtx) {
301 entityOptions.setHeartbeatFrequencyMS(lowHeartbeatFrequency)
302 }
303
304 if err := tc.entities.addEntity(testCtx, entityType, entityOptions); err != nil {
305 if isSkipTestError(err) {
306 ls.Skip(err)
307 }
308
309 return fmt.Errorf("error creating entity at index %d: %v", idx, err)
310 }
311 }
312 }
313
314
315 if mtest.ClusterTopologyKind() == mtest.Sharded && tc.performsDistinct() {
316 if err := performDistinctWorkaround(testCtx); err != nil {
317 return fmt.Errorf("error performing \"distinct\" workaround: %v", err)
318 }
319 }
320
321 for idx, operation := range tc.Operations {
322 if err := operation.execute(testCtx, tc.loopDone); err != nil {
323 if isSkipTestError(err) {
324 ls.Skip(err)
325 }
326
327 return fmt.Errorf("error running operation %q at index %d: %v", operation.Name, idx, err)
328 }
329 }
330
331
332
333 logMessageValidator := newLogMessageValidator(tc)
334 go startLogValidators(testCtx, logMessageValidator)
335
336 for _, client := range tc.entities.clients() {
337 client.stopListeningForEvents()
338 }
339
340
341
342
343 if tc.Description != "BulkWrite on server that doesn't support arrayFilters with arrayFilters on second op" {
344 for idx, expectedEvents := range tc.ExpectedEvents {
345 if err := verifyEvents(testCtx, expectedEvents); err != nil {
346 return fmt.Errorf("events verification failed at index %d: %v", idx, err)
347 }
348 }
349 }
350
351 for idx, collData := range tc.Outcome {
352 if err := collData.verifyContents(testCtx); err != nil {
353 return fmt.Errorf("error verifying outcome for collection %q at index %d: %v",
354 collData.namespace(), idx, err)
355 }
356 }
357
358 {
359
360
361
362
363 ctx, cancel := context.WithTimeout(testCtx, logMessageValidatorTimeout)
364 defer cancel()
365
366
367
368 if err := stopLogValidators(ctx, logMessageValidator); err != nil {
369 return fmt.Errorf("error verifying log messages: %w", err)
370 }
371 }
372
373 return nil
374 }
375
376 func disableUntargetedFailPoints(ctx context.Context) []error {
377 var errs []error
378 for fpName, client := range failPoints(ctx) {
379 if err := disableFailPointWithClient(ctx, fpName, client); err != nil {
380 errs = append(errs, fmt.Errorf("error disabling fail point %q: %v", fpName, err))
381 }
382 }
383 return errs
384 }
385
386 func disableTargetedFailPoints(ctx context.Context) []error {
387 var errs []error
388 for fpName, host := range targetedFailPoints(ctx) {
389 commandFn := func(ctx context.Context, client *mongo.Client) error {
390 return disableFailPointWithClient(ctx, fpName, client)
391 }
392 if err := runCommandOnHost(ctx, host, commandFn); err != nil {
393 errs = append(errs, fmt.Errorf("error disabling targeted fail point %q on host %q: %v", fpName, host, err))
394 }
395 }
396 return errs
397 }
398
399 func disableFailPointWithClient(ctx context.Context, fpName string, client *mongo.Client) error {
400 cmd := bson.D{
401 {"configureFailPoint", fpName},
402 {"mode", "off"},
403 }
404 return client.Database("admin").RunCommand(ctx, cmd).Err()
405 }
406
View as plain text