...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/unified_spec_runner.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"io/ioutil"
    13  	"path"
    14  	"strings"
    15  	"testing"
    16  	"time"
    17  
    18  	"go.mongodb.org/mongo-driver/bson"
    19  	"go.mongodb.org/mongo-driver/internal/assert"
    20  	"go.mongodb.org/mongo-driver/internal/spectest"
    21  	"go.mongodb.org/mongo-driver/mongo"
    22  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    23  )
    24  
    25  var (
    26  	skippedTestDescriptions = map[string]string{
    27  		// GODRIVER-1773: This test runs a "find" with limit=4 and batchSize=3. It expects batchSize values of three for
    28  		// the "find" and one for the "getMore", but we send three for both.
    29  		"A successful find event with a getmore and the server kills the cursor (<= 4.4)": "See GODRIVER-1773",
    30  
    31  		// GODRIVER-2577: The following spec tests require canceling ops immediately, but the current logic clears pools
    32  		// and cancels in-progress ops after two the heartbeat failures.
    33  		"Connection pool clear uses interruptInUseConnections=true after monitor timeout":                      "Godriver clears after multiple timeout",
    34  		"Error returned from connection pool clear with interruptInUseConnections=true is retryable":           "Godriver clears after multiple timeout",
    35  		"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
    36  
    37  		// TODO(GODRIVER-2843): Fix and unskip these test cases.
    38  		"Find operation with snapshot":                                      "Test fails frequently. See GODRIVER-2843",
    39  		"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
    40  
    41  		// TODO(GODRIVER-3043): Avoid Appending Write/Read Concern in Atlas Search
    42  		// Index Helper Commands.
    43  		"dropSearchIndex ignores read and write concern":       "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
    44  		"listSearchIndexes ignores read and write concern":     "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
    45  		"updateSearchIndex ignores the read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
    46  
    47  		// TODO(GODRIVER-3137): Implement Gossip cluster time"
    48  		"unpin after TransientTransactionError error on commit": "Implement GODRIVER-3137",
    49  
    50  		// TODO(GODRIVER-3034): Drivers should unpin connections when ending a session
    51  		"unpin on successful abort":                                   "Implement GODRIVER-3034",
    52  		"unpin after non-transient error on abort":                    "Implement GODRIVER-3034",
    53  		"unpin after TransientTransactionError error on abort":        "Implement GODRIVER-3034",
    54  		"unpin when a new transaction is started":                     "Implement GODRIVER-3034",
    55  		"unpin when a non-transaction write operation uses a session": "Implement GODRIVER-3034",
    56  		"unpin when a non-transaction read operation uses a session":  "Implement GODRIVER-3034",
    57  
    58  		// DRIVERS-2722: Setting "maxTimeMS" on a command that creates a cursor
    59  		// also limits the lifetime of the cursor. That may be surprising to
    60  		// users, so omit "maxTimeMS" from operations that return user-managed
    61  		// cursors.
    62  		"timeoutMS can be overridden for a find":                                               "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
    63  		"timeoutMS can be configured for an operation - find on collection":                    "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
    64  		"timeoutMS can be configured for an operation - aggregate on collection":               "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
    65  		"timeoutMS can be configured for an operation - aggregate on database":                 "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
    66  		"operation is retried multiple times for non-zero timeoutMS - find on collection":      "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
    67  		"operation is retried multiple times for non-zero timeoutMS - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
    68  		"operation is retried multiple times for non-zero timeoutMS - aggregate on database":   "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
    69  	}
    70  
    71  	logMessageValidatorTimeout = 10 * time.Millisecond
    72  	lowHeartbeatFrequency      = 50 * time.Millisecond
    73  )
    74  
    75  // TestCase holds and runs a unified spec test case
    76  type TestCase struct {
    77  	Description       string               `bson:"description"`
    78  	RunOnRequirements []mtest.RunOnBlock   `bson:"runOnRequirements"`
    79  	SkipReason        *string              `bson:"skipReason"`
    80  	Operations        []*operation         `bson:"operations"`
    81  	ExpectedEvents    []*expectedEvents    `bson:"expectEvents"`
    82  	ExpectLogMessages []*clientLogMessages `bson:"expectLogMessages"`
    83  	Outcome           []*collectionData    `bson:"outcome"`
    84  
    85  	initialData     []*collectionData
    86  	createEntities  []map[string]*entityOptions
    87  	killAllSessions bool
    88  	schemaVersion   string
    89  
    90  	entities *EntityMap
    91  	loopDone chan struct{}
    92  }
    93  
    94  func (tc *TestCase) performsDistinct() bool {
    95  	return tc.performsOperation("distinct")
    96  }
    97  
    98  func (tc *TestCase) setsFailPoint() bool {
    99  	return tc.performsOperation("failPoint")
   100  }
   101  
   102  func (tc *TestCase) startsTransaction() bool {
   103  	return tc.performsOperation("startTransaction")
   104  }
   105  
   106  func (tc *TestCase) performsOperation(name string) bool {
   107  	for _, op := range tc.Operations {
   108  		if op.Name == name {
   109  			return true
   110  		}
   111  	}
   112  	return false
   113  }
   114  
   115  // TestFile holds the contents of a unified spec test file
   116  type TestFile struct {
   117  	Description       string                      `bson:"description"`
   118  	SchemaVersion     string                      `bson:"schemaVersion"`
   119  	RunOnRequirements []mtest.RunOnBlock          `bson:"runOnRequirements"`
   120  	CreateEntities    []map[string]*entityOptions `bson:"createEntities"`
   121  	InitialData       []*collectionData           `bson:"initialData"`
   122  	TestCases         []*TestCase                 `bson:"tests"`
   123  }
   124  
   125  // runTestDirectory runs the files in the given directory, which must be in the unified spec format, with
   126  // expectValidFail determining whether the tests should expect to pass or fail
   127  func runTestDirectory(t *testing.T, directoryPath string, expectValidFail bool) {
   128  	for _, filename := range spectest.FindJSONFilesInDir(t, directoryPath) {
   129  		t.Run(filename, func(t *testing.T) {
   130  			runTestFile(t, path.Join(directoryPath, filename), expectValidFail)
   131  		})
   132  	}
   133  }
   134  
   135  // runTestFile runs the tests in the given file, with expectValidFail determining whether the tests should expect to pass or fail
   136  func runTestFile(t *testing.T, filepath string, expectValidFail bool, opts ...*Options) {
   137  	content, err := ioutil.ReadFile(filepath)
   138  	assert.Nil(t, err, "ReadFile error for file %q: %v", filepath, err)
   139  
   140  	fileReqs, testCases := ParseTestFile(t, content, expectValidFail, opts...)
   141  
   142  	mtOpts := mtest.NewOptions().
   143  		RunOn(fileReqs...).
   144  		CreateClient(false)
   145  	mt := mtest.New(t, mtOpts)
   146  
   147  	for _, testCase := range testCases {
   148  		mtOpts := mtest.NewOptions().
   149  			RunOn(testCase.RunOnRequirements...).
   150  			CreateClient(false)
   151  
   152  		mt.RunOpts(testCase.Description, mtOpts, func(mt *mtest.T) {
   153  			defer func() {
   154  				// catch panics from looking up elements and fail if it's unexpected
   155  				if r := recover(); r != nil {
   156  					if !expectValidFail {
   157  						mt.Fatal(r)
   158  					}
   159  				}
   160  			}()
   161  			err := testCase.Run(mt)
   162  			if expectValidFail {
   163  				if err != nil {
   164  					return
   165  				}
   166  				mt.Fatalf("expected test to error, got nil")
   167  			}
   168  			if err != nil {
   169  				mt.Fatal(err)
   170  			}
   171  		})
   172  	}
   173  }
   174  
   175  func parseTestFile(testJSON []byte, opts ...*Options) ([]mtest.RunOnBlock, []*TestCase, error) {
   176  	var testFile TestFile
   177  	if err := bson.UnmarshalExtJSON(testJSON, false, &testFile); err != nil {
   178  		return nil, nil, err
   179  	}
   180  
   181  	op := MergeOptions(opts...)
   182  	for _, testCase := range testFile.TestCases {
   183  		testCase.initialData = testFile.InitialData
   184  		testCase.createEntities = testFile.CreateEntities
   185  		testCase.schemaVersion = testFile.SchemaVersion
   186  		testCase.entities = newEntityMap()
   187  		testCase.loopDone = make(chan struct{})
   188  		testCase.killAllSessions = *op.RunKillAllSessions
   189  	}
   190  
   191  	return testFile.RunOnRequirements, testFile.TestCases, nil
   192  }
   193  
   194  // ParseTestFile create an array of TestCases from the testJSON json blob
   195  func ParseTestFile(t *testing.T, testJSON []byte, expectValidFail bool, opts ...*Options) ([]mtest.RunOnBlock, []*TestCase) {
   196  	t.Helper()
   197  
   198  	runOnRequirements, testCases, err := parseTestFile(testJSON, opts...)
   199  
   200  	if !expectValidFail {
   201  		assert.NoError(t, err, "error parsing test file")
   202  	}
   203  
   204  	return runOnRequirements, testCases
   205  }
   206  
   207  // GetEntities returns a pointer to the EntityMap for the TestCase. This should not be called until after
   208  // the test is run
   209  func (tc *TestCase) GetEntities() *EntityMap {
   210  	return tc.entities
   211  }
   212  
   213  // EndLoop will cause the runner to stop a loop operation if one is included in the test. If the test has finished
   214  // running, this will panic
   215  func (tc *TestCase) EndLoop() {
   216  	tc.loopDone <- struct{}{}
   217  }
   218  
   219  // LoggerSkipper is passed to TestCase.Run to allow it to perform logging and skipping operations
   220  type LoggerSkipper interface {
   221  	Log(args ...interface{})
   222  	Logf(format string, args ...interface{})
   223  	Skip(args ...interface{})
   224  	Skipf(format string, args ...interface{})
   225  }
   226  
   227  // skipTestError indicates that a test must be skipped because the runner cannot execute it (e.g. the test requires
   228  // an operation or option that the driver does not support).
   229  type skipTestError struct {
   230  	reason string
   231  }
   232  
   233  // Error implements the error interface.
   234  func (s skipTestError) Error() string {
   235  	return fmt.Sprintf("test must be skipped: %q", s.reason)
   236  }
   237  
   238  func newSkipTestError(reason string) error {
   239  	return &skipTestError{reason}
   240  }
   241  
   242  func isSkipTestError(err error) bool {
   243  	return err != nil && strings.Contains(err.Error(), "test must be skipped")
   244  }
   245  
   246  // Run runs the TestCase and returns an error if it fails
   247  func (tc *TestCase) Run(ls LoggerSkipper) error {
   248  	if tc.SkipReason != nil {
   249  		ls.Skipf("skipping for reason: %q", *tc.SkipReason)
   250  	}
   251  	if skipReason, ok := skippedTestDescriptions[tc.Description]; ok {
   252  		ls.Skipf("skipping due to known failure: %v", skipReason)
   253  	}
   254  
   255  	// Validate that we support the schema declared by the test file before attempting to use its contents.
   256  	if err := checkSchemaVersion(tc.schemaVersion); err != nil {
   257  		return fmt.Errorf("schema version %q not supported: %v", tc.schemaVersion, err)
   258  	}
   259  
   260  	testCtx := newTestContext(context.Background(), tc.entities, tc.ExpectLogMessages, tc.setsFailPoint())
   261  
   262  	defer func() {
   263  		// If anything fails while doing test cleanup, we only log the error because the actual test may have already
   264  		// failed and that failure should be preserved.
   265  
   266  		for _, err := range disableUntargetedFailPoints(testCtx) {
   267  			ls.Log(err)
   268  		}
   269  		for _, err := range disableTargetedFailPoints(testCtx) {
   270  			ls.Log(err)
   271  		}
   272  		for _, err := range entities(testCtx).close(testCtx) {
   273  			ls.Log(err)
   274  		}
   275  		// Tests that started a transaction should terminate any sessions left open on the server. This is required even
   276  		// if the test attempted to commit/abort the transaction because an abortTransaction command can fail if it's
   277  		// sent to a mongos that isn't aware of the transaction.
   278  		if tc.startsTransaction() && tc.killAllSessions {
   279  			if err := terminateOpenSessions(context.Background()); err != nil {
   280  				ls.Logf("error terminating open transactions after failed test: %v", err)
   281  			}
   282  		}
   283  
   284  		close(tc.loopDone)
   285  	}()
   286  
   287  	// Set up collections based on the file-level initialData field.
   288  	for _, collData := range tc.initialData {
   289  		if err := collData.createCollection(testCtx); err != nil {
   290  			return fmt.Errorf("error setting up collection %q: %v", collData.namespace(), err)
   291  		}
   292  	}
   293  
   294  	// Set up entities based on the file-level createEntities field. For client entities, if the test will configure
   295  	// a fail point, set a low heartbeatFrequencyMS value into the URI options map if one is not already present.
   296  	// This speeds up recovery time for the client if the fail point forces the server to return a state change
   297  	// error.
   298  	for idx, entity := range tc.createEntities {
   299  		for entityType, entityOptions := range entity {
   300  			if entityType == "client" && hasOperationalFailpoint(testCtx) {
   301  				entityOptions.setHeartbeatFrequencyMS(lowHeartbeatFrequency)
   302  			}
   303  
   304  			if err := tc.entities.addEntity(testCtx, entityType, entityOptions); err != nil {
   305  				if isSkipTestError(err) {
   306  					ls.Skip(err)
   307  				}
   308  
   309  				return fmt.Errorf("error creating entity at index %d: %v", idx, err)
   310  			}
   311  		}
   312  	}
   313  
   314  	// Work around SERVER-39704.
   315  	if mtest.ClusterTopologyKind() == mtest.Sharded && tc.performsDistinct() {
   316  		if err := performDistinctWorkaround(testCtx); err != nil {
   317  			return fmt.Errorf("error performing \"distinct\" workaround: %v", err)
   318  		}
   319  	}
   320  
   321  	for idx, operation := range tc.Operations {
   322  		if err := operation.execute(testCtx, tc.loopDone); err != nil {
   323  			if isSkipTestError(err) {
   324  				ls.Skip(err)
   325  			}
   326  
   327  			return fmt.Errorf("error running operation %q at index %d: %v", operation.Name, idx, err)
   328  		}
   329  	}
   330  
   331  	// Create a validator for log messages and start the workers that will
   332  	// observe log messages as they occur operationally.
   333  	logMessageValidator := newLogMessageValidator(tc)
   334  	go startLogValidators(testCtx, logMessageValidator)
   335  
   336  	for _, client := range tc.entities.clients() {
   337  		client.stopListeningForEvents()
   338  	}
   339  
   340  	// One of the bulkWrite spec tests expects update and updateMany to be grouped together into a single batch,
   341  	// but this isn't the case because of GODRIVER-1157. To work around this, we skip event verification for this test.
   342  	// This guard should be removed when GODRIVER-1157 is done.
   343  	if tc.Description != "BulkWrite on server that doesn't support arrayFilters with arrayFilters on second op" {
   344  		for idx, expectedEvents := range tc.ExpectedEvents {
   345  			if err := verifyEvents(testCtx, expectedEvents); err != nil {
   346  				return fmt.Errorf("events verification failed at index %d: %v", idx, err)
   347  			}
   348  		}
   349  	}
   350  
   351  	for idx, collData := range tc.Outcome {
   352  		if err := collData.verifyContents(testCtx); err != nil {
   353  			return fmt.Errorf("error verifying outcome for collection %q at index %d: %v",
   354  				collData.namespace(), idx, err)
   355  		}
   356  	}
   357  
   358  	{
   359  		// Create a context with a deadline to use for log message
   360  		// validation. This will prevent any blocking from test cases
   361  		// with N messages where only N - K (0 < K < N) messages are
   362  		// observed.
   363  		ctx, cancel := context.WithTimeout(testCtx, logMessageValidatorTimeout)
   364  		defer cancel()
   365  
   366  		// For each client, verify that all expected log messages were
   367  		// received.
   368  		if err := stopLogValidators(ctx, logMessageValidator); err != nil {
   369  			return fmt.Errorf("error verifying log messages: %w", err)
   370  		}
   371  	}
   372  
   373  	return nil
   374  }
   375  
   376  func disableUntargetedFailPoints(ctx context.Context) []error {
   377  	var errs []error
   378  	for fpName, client := range failPoints(ctx) {
   379  		if err := disableFailPointWithClient(ctx, fpName, client); err != nil {
   380  			errs = append(errs, fmt.Errorf("error disabling fail point %q: %v", fpName, err))
   381  		}
   382  	}
   383  	return errs
   384  }
   385  
   386  func disableTargetedFailPoints(ctx context.Context) []error {
   387  	var errs []error
   388  	for fpName, host := range targetedFailPoints(ctx) {
   389  		commandFn := func(ctx context.Context, client *mongo.Client) error {
   390  			return disableFailPointWithClient(ctx, fpName, client)
   391  		}
   392  		if err := runCommandOnHost(ctx, host, commandFn); err != nil {
   393  			errs = append(errs, fmt.Errorf("error disabling targeted fail point %q on host %q: %v", fpName, host, err))
   394  		}
   395  	}
   396  	return errs
   397  }
   398  
   399  func disableFailPointWithClient(ctx context.Context, fpName string, client *mongo.Client) error {
   400  	cmd := bson.D{
   401  		{"configureFailPoint", fpName},
   402  		{"mode", "off"},
   403  	}
   404  	return client.Database("admin").RunCommand(ctx, cmd).Err()
   405  }
   406  

View as plain text