...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/CMAP_spec_test.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/topology

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package topology
     8  
     9  import (
    10  	"context"
    11  	"encoding/json"
    12  	"fmt"
    13  	"io/ioutil"
    14  	"net"
    15  	"path"
    16  	"strings"
    17  	"sync"
    18  	"sync/atomic"
    19  	"testing"
    20  	"time"
    21  
    22  	"go.mongodb.org/mongo-driver/bson/primitive"
    23  	"go.mongodb.org/mongo-driver/event"
    24  	"go.mongodb.org/mongo-driver/internal/require"
    25  	"go.mongodb.org/mongo-driver/internal/spectest"
    26  	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
    27  )
    28  
    29  // skippedTestDescriptions is a collection of test descriptions that the test runner will skip. The
    30  // map format is {"test description": "reason", }.
    31  var skippedTestDescriptions = map[string]string{
    32  	// GODRIVER-1827: These 2 tests assert that in-use connections are not closed until checked
    33  	// back into a closed pool, but the Go connection pool aggressively closes in-use connections.
    34  	// That behavior is currently required by the "Client.Disconnect" API, so skip the tests.
    35  	"When a pool is closed, it MUST first destroy all available connections in that pool": "test requires that close does not aggressively close used connections",
    36  	"must destroy checked in connection if pool has been closed":                          "test requires that close does not aggressively close used connections",
    37  	// GODRIVER-1826: The load-balancer SDAM error handling test "errors during authentication are
    38  	// processed" currently asserts that handshake errors trigger events "pool cleared" then
    39  	// "connection closed". However, the "error during minPoolSize population clears pool" test
    40  	// asserts that handshake errors trigger events "connection closed" then "pool cleared". The Go
    41  	// driver uses the same code path for creating all application connections, so those opposing
    42  	// event orders cannot be satisfied simultaneously.
    43  	// TODO(DRIVERS-1785): Re-enable this test once the spec test is updated to use the same event order as the "errors
    44  	// TODO during authentication are processed" load-balancer SDAM spec test.
    45  	"error during minPoolSize population clears pool": "event ordering is incompatible with load-balancer SDAM spec test (DRIVERS-1785)",
    46  	// GODRIVER-1826: The Go connection pool does not currently always deliver connections created
    47  	// by maintain() to waiting check-outs. There is a race condition between the goroutine started
    48  	// by maintain() to check-in a requested connection and createConnections() picking up the next
    49  	// wantConn created by the waiting check-outs. Most of the time, createConnections() wins and
    50  	// starts creating new connections. That is not a problem for general use cases, but it prevents
    51  	// the "threads blocked by maxConnecting check out minPoolSize connections" test from passing.
    52  	// TODO(DRIVERS-2225): Re-enable this test once the spec test is updated to support the Go pool minPoolSize
    53  	// TODO maintain() behavior.
    54  	"threads blocked by maxConnecting check out minPoolSize connections": "test requires that connections established by minPoolSize are immediately used to satisfy check-out requests (DRIVERS-2225)",
    55  	// GODRIVER-1826: The Go connection pool currently delivers any available connection to the
    56  	// earliest waiting check-out request, independent of if that check-out request already
    57  	// requested a new connection. That behavior is currently incompatible with the "threads blocked
    58  	// by maxConnecting check out returned connections" test, which expects that check-out requests
    59  	// that request a new connection cannot be satisfied by a check-in.
    60  	// TODO(DRIVERS-2223): Re-enable this test once the spec test is updated to support the Go pool check-in behavior.
    61  	"threads blocked by maxConnecting check out returned connections": "test requires a checked-in connections cannot satisfy a check-out waiting on a new connection (DRIVERS-2223)",
    62  }
    63  
    64  type cmapEvent struct {
    65  	EventType    string      `json:"type"`
    66  	Address      interface{} `json:"address"`
    67  	ConnectionID uint64      `json:"connectionId"`
    68  	Options      interface{} `json:"options"`
    69  	Reason       string      `json:"reason"`
    70  }
    71  
    72  type poolOptions struct {
    73  	MaxPoolSize                int32 `json:"maxPoolSize"`
    74  	MinPoolSize                int32 `json:"minPoolSize"`
    75  	MaxConnecting              int32 `json:"maxConnecting"`
    76  	MaxIdleTimeMS              int32 `json:"maxIdleTimeMS"`
    77  	WaitQueueTimeoutMS         int32 `json:"waitQueueTimeoutMS"`
    78  	BackgroundThreadIntervalMS int32 `json:"backgroundThreadIntervalMS"`
    79  }
    80  
    81  type cmapTestFile struct {
    82  	Version     uint64                   `json:"version"`
    83  	Style       string                   `json:"style"`
    84  	Description string                   `json:"description"`
    85  	SkipReason  string                   `json:"skipReason"`
    86  	FailPoint   map[string]interface{}   `json:"failPoint"`
    87  	PoolOptions poolOptions              `json:"poolOptions"`
    88  	Operations  []map[string]interface{} `json:"operations"`
    89  	Error       *cmapTestError           `json:"error"`
    90  	Events      []cmapEvent              `json:"events"`
    91  	Ignore      []string                 `json:"ignore"`
    92  }
    93  
    94  type cmapTestError struct {
    95  	ErrorType string `json:"type"`
    96  	Message   string `json:"message"`
    97  	Address   string `json:"address"`
    98  }
    99  
   100  type simThread struct {
   101  	JobQueue      chan func()
   102  	JobsAssigned  int32
   103  	JobsCompleted int32
   104  }
   105  
   106  type testInfo struct {
   107  	objects                map[string]interface{}
   108  	originalEventChan      chan *event.PoolEvent
   109  	finalEventChan         chan *event.PoolEvent
   110  	threads                map[string]*simThread
   111  	backgroundThreadErrors chan error
   112  	eventCounts            map[string]uint64
   113  	sync.Mutex
   114  }
   115  
   116  const cmapTestDir = "../../../../testdata/connection-monitoring-and-pooling/"
   117  
   118  func TestCMAPSpec(t *testing.T) {
   119  	for _, testFileName := range spectest.FindJSONFilesInDir(t, cmapTestDir) {
   120  		t.Run(testFileName, func(t *testing.T) {
   121  			runCMAPTest(t, testFileName)
   122  		})
   123  	}
   124  }
   125  
   126  func runCMAPTest(t *testing.T, testFileName string) {
   127  	content, err := ioutil.ReadFile(path.Join(cmapTestDir, testFileName))
   128  	require.NoErrorf(t, err, "unable to read content of test file")
   129  
   130  	var test cmapTestFile
   131  	err = json.Unmarshal(content, &test)
   132  	require.NoErrorf(t, err, "error unmarshalling testFile")
   133  
   134  	if test.SkipReason != "" {
   135  		t.Skip(test.SkipReason)
   136  	}
   137  	if msg, ok := skippedTestDescriptions[test.Description]; ok {
   138  		t.Skip(msg)
   139  	}
   140  
   141  	testInfo := &testInfo{
   142  		objects:                make(map[string]interface{}),
   143  		originalEventChan:      make(chan *event.PoolEvent, 200),
   144  		finalEventChan:         make(chan *event.PoolEvent, 200),
   145  		threads:                make(map[string]*simThread),
   146  		eventCounts:            make(map[string]uint64),
   147  		backgroundThreadErrors: make(chan error, 100),
   148  	}
   149  
   150  	sOpts := []ServerOption{
   151  		WithMaxConnections(func(uint64) uint64 {
   152  			return uint64(test.PoolOptions.MaxPoolSize)
   153  		}),
   154  		WithMinConnections(func(uint64) uint64 {
   155  			return uint64(test.PoolOptions.MinPoolSize)
   156  		}),
   157  		WithMaxConnecting(func(uint64) uint64 {
   158  			return uint64(test.PoolOptions.MaxConnecting)
   159  		}),
   160  		WithConnectionPoolMaxIdleTime(func(time.Duration) time.Duration {
   161  			return time.Duration(test.PoolOptions.MaxIdleTimeMS) * time.Millisecond
   162  		}),
   163  		WithConnectionPoolMaintainInterval(func(time.Duration) time.Duration {
   164  			return time.Duration(test.PoolOptions.BackgroundThreadIntervalMS) * time.Millisecond
   165  		}),
   166  		WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor {
   167  			return &event.PoolMonitor{
   168  				Event: func(evt *event.PoolEvent) { testInfo.originalEventChan <- evt },
   169  			}
   170  		}),
   171  	}
   172  
   173  	var delay time.Duration
   174  	var closeConnection bool
   175  
   176  	if test.FailPoint != nil {
   177  		data, ok := test.FailPoint["data"].(map[string]interface{})
   178  		if !ok {
   179  			t.Fatalf("expected to find \"data\" map in failPoint (%v)", test.FailPoint)
   180  		}
   181  
   182  		blockConnection, _ := data["blockConnection"].(bool)
   183  		if blockTimeMS, ok := data["blockTimeMS"].(float64); ok && blockConnection {
   184  			delay = time.Duration(blockTimeMS) * time.Millisecond
   185  		}
   186  
   187  		closeConnection, _ = data["closeConnection"].(bool)
   188  	}
   189  
   190  	// Use a dialer that returns mock connections that always respond with a
   191  	// "hello" reply. If there's a failpoint configured in the test, use a
   192  	// dialer that returns connections that mock the configured failpoint.
   193  	sOpts = append(sOpts, WithConnectionOptions(func(...ConnectionOption) []ConnectionOption {
   194  		return []ConnectionOption{
   195  			WithDialer(func(Dialer) Dialer {
   196  				return DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
   197  					msc := newMockSlowConn(makeHelloReply(), delay)
   198  					if closeConnection {
   199  						msc.Close()
   200  					}
   201  					return msc, nil
   202  				})
   203  			}),
   204  			WithHandshaker(func(h Handshaker) Handshaker {
   205  				return operation.NewHello()
   206  			}),
   207  		}
   208  	}))
   209  
   210  	s := NewServer("mongodb://fake", primitive.NewObjectID(), sOpts...)
   211  	s.state = serverConnected
   212  	require.NoError(t, err, "error connecting connection pool")
   213  	defer s.pool.close(context.Background())
   214  
   215  	for _, op := range test.Operations {
   216  		if tempErr := runOperation(t, op, testInfo, s, test.PoolOptions.WaitQueueTimeoutMS); tempErr != nil {
   217  			if err != nil {
   218  				t.Fatalf("received multiple errors in primary thread: %v and %v", err, tempErr)
   219  			}
   220  			err = tempErr
   221  		}
   222  	}
   223  
   224  	// make sure all threads have finished
   225  	testInfo.Lock()
   226  	threadNames := make([]string, 0)
   227  	for threadName := range testInfo.threads {
   228  		threadNames = append(threadNames, threadName)
   229  	}
   230  	testInfo.Unlock()
   231  
   232  	for _, threadName := range threadNames {
   233  	WAIT:
   234  		for {
   235  			testInfo.Lock()
   236  			thread, ok := testInfo.threads[threadName]
   237  			if !ok {
   238  				t.Fatalf("thread was unexpectedly ended: %v", threadName)
   239  			}
   240  			if len(thread.JobQueue) == 0 && atomic.LoadInt32(&thread.JobsCompleted) == atomic.LoadInt32(&thread.JobsAssigned) {
   241  				break WAIT
   242  			}
   243  			testInfo.Unlock()
   244  		}
   245  		close(testInfo.threads[threadName].JobQueue)
   246  		testInfo.Unlock()
   247  	}
   248  
   249  	if test.Error != nil {
   250  		if err == nil || strings.ToLower(test.Error.Message) != err.Error() {
   251  			var erroredCorrectly bool
   252  			errs := make([]error, 0, len(testInfo.backgroundThreadErrors)+1)
   253  			errs = append(errs, err)
   254  			for len(testInfo.backgroundThreadErrors) > 0 {
   255  				bgErr := <-testInfo.backgroundThreadErrors
   256  				errs = append(errs, bgErr)
   257  				if bgErr != nil && strings.Contains(bgErr.Error(), strings.ToLower(test.Error.Message)) {
   258  					erroredCorrectly = true
   259  					break
   260  				}
   261  			}
   262  			if !erroredCorrectly {
   263  				t.Fatalf("error differed from expected error, expected: %v, actual errors received: %v", test.Error.Message, errs)
   264  			}
   265  		}
   266  	}
   267  
   268  	testInfo.Lock()
   269  	defer testInfo.Unlock()
   270  	for len(testInfo.originalEventChan) > 0 {
   271  		temp := <-testInfo.originalEventChan
   272  		testInfo.finalEventChan <- temp
   273  	}
   274  
   275  	checkEvents(t, test.Events, testInfo.finalEventChan, test.Ignore)
   276  
   277  }
   278  
   279  func checkEvents(t *testing.T, expectedEvents []cmapEvent, actualEvents chan *event.PoolEvent, ignoreEvents []string) {
   280  	for _, expectedEvent := range expectedEvents {
   281  		validEvent := nextValidEvent(t, actualEvents, ignoreEvents)
   282  
   283  		if expectedEvent.EventType != validEvent.Type {
   284  			var reason string
   285  			if validEvent.Type == "ConnectionCheckOutFailed" {
   286  				reason = ": " + validEvent.Reason
   287  			}
   288  			t.Errorf("unexpected event occurred: expected: %v, actual: %v%v", expectedEvent.EventType, validEvent.Type, reason)
   289  		}
   290  
   291  		if expectedEvent.Address != nil {
   292  
   293  			if expectedEvent.Address == float64(42) { // can be any address
   294  				if validEvent.Address == "" {
   295  					t.Errorf("expected address in event, instead received none in %v", expectedEvent.EventType)
   296  				}
   297  			} else { // must be specific address
   298  				addr, ok := expectedEvent.Address.(string)
   299  				if !ok {
   300  					t.Errorf("received non string address: %v", expectedEvent.Address)
   301  				}
   302  				if addr != validEvent.Address {
   303  					t.Errorf("received unexpected address: %v, expected: %v", validEvent.Address, expectedEvent.Address)
   304  				}
   305  			}
   306  		}
   307  
   308  		if expectedEvent.ConnectionID != 0 {
   309  			if expectedEvent.ConnectionID == 42 {
   310  				if validEvent.ConnectionID == 0 {
   311  					t.Errorf("expected a connectionId but found none in %v", validEvent.Type)
   312  				}
   313  			} else if expectedEvent.ConnectionID != validEvent.ConnectionID {
   314  				t.Errorf("expected and actual connectionIds differed: expected: %v, actual: %v for event: %v", expectedEvent.ConnectionID, validEvent.ConnectionID, expectedEvent.EventType)
   315  			}
   316  		}
   317  
   318  		if expectedEvent.Reason != "" && expectedEvent.Reason != validEvent.Reason {
   319  			t.Errorf("event reason differed from expected: expected: %v, actual: %v for %v", expectedEvent.Reason, validEvent.Reason, expectedEvent.EventType)
   320  		}
   321  
   322  		if expectedEvent.Options != nil {
   323  			if expectedEvent.Options == float64(42) {
   324  				if validEvent.PoolOptions == nil {
   325  					t.Errorf("expected poolevent options but found none")
   326  				}
   327  			} else {
   328  				opts, ok := expectedEvent.Options.(map[string]interface{})
   329  				if !ok {
   330  					t.Errorf("event options were unexpected type: %T for %v", expectedEvent.Options, expectedEvent.EventType)
   331  				}
   332  
   333  				if maxSize, ok := opts["maxPoolSize"]; ok && validEvent.PoolOptions.MaxPoolSize != uint64(maxSize.(float64)) {
   334  					t.Errorf("event's max pool size differed from expected: %v, actual: %v", maxSize, validEvent.PoolOptions.MaxPoolSize)
   335  				}
   336  
   337  				if minSize, ok := opts["minPoolSize"]; ok && validEvent.PoolOptions.MinPoolSize != uint64(minSize.(float64)) {
   338  					t.Errorf("event's min pool size differed from expected: %v, actual: %v", minSize, validEvent.PoolOptions.MinPoolSize)
   339  				}
   340  
   341  				if waitQueueTimeoutMS, ok := opts["waitQueueTimeoutMS"]; ok && validEvent.PoolOptions.WaitQueueTimeoutMS != uint64(waitQueueTimeoutMS.(float64)) {
   342  					t.Errorf("event's min pool size differed from expected: %v, actual: %v", waitQueueTimeoutMS, validEvent.PoolOptions.WaitQueueTimeoutMS)
   343  				}
   344  			}
   345  		}
   346  	}
   347  }
   348  
   349  func nextValidEvent(t *testing.T, events chan *event.PoolEvent, ignoreEvents []string) *event.PoolEvent {
   350  	t.Helper()
   351  NextEvent:
   352  	for {
   353  		if len(events) == 0 {
   354  			t.Fatalf("unable to get next event. too few events occurred")
   355  		}
   356  
   357  		event := <-events
   358  		for _, Type := range ignoreEvents {
   359  			if event.Type == Type {
   360  				continue NextEvent
   361  			}
   362  		}
   363  		return event
   364  	}
   365  }
   366  
   367  func runOperation(t *testing.T, operation map[string]interface{}, testInfo *testInfo, s *Server, checkOutTimeout int32) error {
   368  	threadName, ok := operation["thread"]
   369  	if ok { // to be run in background thread
   370  		testInfo.Lock()
   371  		thread, ok := testInfo.threads[threadName.(string)]
   372  		if !ok {
   373  			thread = &simThread{
   374  				JobQueue: make(chan func(), 200),
   375  			}
   376  			testInfo.threads[threadName.(string)] = thread
   377  
   378  			go func() {
   379  				for {
   380  					job, more := <-thread.JobQueue
   381  					if !more {
   382  						break
   383  					}
   384  					job()
   385  					atomic.AddInt32(&thread.JobsCompleted, 1)
   386  				}
   387  			}()
   388  		}
   389  		testInfo.Unlock()
   390  
   391  		atomic.AddInt32(&thread.JobsAssigned, 1)
   392  		thread.JobQueue <- func() {
   393  			err := runOperationInThread(t, operation, testInfo, s, checkOutTimeout)
   394  			testInfo.backgroundThreadErrors <- err
   395  		}
   396  
   397  		return nil // since we don't care about errors occurring in non primary threads
   398  	}
   399  	return runOperationInThread(t, operation, testInfo, s, checkOutTimeout)
   400  }
   401  
   402  func runOperationInThread(t *testing.T, operation map[string]interface{}, testInfo *testInfo, s *Server, checkOutTimeout int32) error {
   403  	name, ok := operation["name"]
   404  	if !ok {
   405  		t.Fatalf("unable to find name in operation")
   406  	}
   407  
   408  	switch name {
   409  	case "start":
   410  		return nil // we dont need to start another thread since this has already been done in runOperation
   411  	case "wait":
   412  		timeMs, ok := operation["ms"]
   413  		if !ok {
   414  			t.Fatalf("unable to find ms in wait operation")
   415  		}
   416  		dur := time.Duration(int64(timeMs.(float64))) * time.Millisecond
   417  		time.Sleep(dur)
   418  	case "waitForThread":
   419  		threadName, ok := operation["target"]
   420  		if !ok {
   421  			t.Fatalf("unable to waitForThread without specified threadName")
   422  		}
   423  
   424  		testInfo.Lock()
   425  		thread, ok := testInfo.threads[threadName.(string)]
   426  		testInfo.Unlock()
   427  		if !ok {
   428  			t.Fatalf("unable to find thread to wait for: %v", threadName)
   429  		}
   430  
   431  		for {
   432  			if atomic.LoadInt32(&thread.JobsCompleted) == atomic.LoadInt32(&thread.JobsAssigned) {
   433  				break
   434  			}
   435  		}
   436  	case "waitForEvent":
   437  		var targetCount int
   438  		{
   439  			f, ok := operation["count"].(float64)
   440  			if !ok {
   441  				t.Fatalf("count is required to waitForEvent")
   442  			}
   443  			targetCount = int(f)
   444  		}
   445  
   446  		targetEventName, ok := operation["event"].(string)
   447  		if !ok {
   448  			t.Fatalf("event is require to waitForEvent")
   449  		}
   450  
   451  		// If there is a timeout specified in the "waitForEvent" operation, then use that timeout.
   452  		// Otherwise, use a default timeout of 10s when waiting for events. Using a default timeout
   453  		// prevents the Go test runner from timing out, which just prints a stack trace and no
   454  		// information about what event the test was waiting for.
   455  		timeout := 10 * time.Second
   456  		if timeoutMS, ok := operation["timeout"].(float64); ok {
   457  			timeout = time.Duration(timeoutMS) * time.Millisecond
   458  		}
   459  
   460  		originalChan := testInfo.originalEventChan
   461  		finalChan := testInfo.finalEventChan
   462  
   463  		for {
   464  			var event *event.PoolEvent
   465  			{
   466  				timer := time.NewTimer(timeout)
   467  				select {
   468  				case event = <-originalChan:
   469  				case <-timer.C:
   470  					t.Fatalf("timed out waiting for %d %q events", targetCount, targetEventName)
   471  				}
   472  				timer.Stop()
   473  			}
   474  			finalChan <- event
   475  
   476  			testInfo.Lock()
   477  			_, ok = testInfo.eventCounts[event.Type]
   478  			if !ok {
   479  				testInfo.eventCounts[event.Type] = 0
   480  			}
   481  			testInfo.eventCounts[event.Type]++
   482  			count := testInfo.eventCounts[event.Type]
   483  			testInfo.Unlock()
   484  
   485  			if event.Type == targetEventName && count == uint64(targetCount) {
   486  				break
   487  			}
   488  		}
   489  	case "checkOut":
   490  		checkoutContext := context.Background()
   491  		if checkOutTimeout != 0 {
   492  			var cancel context.CancelFunc
   493  			checkoutContext, cancel = context.WithTimeout(context.Background(), time.Duration(checkOutTimeout)*time.Millisecond)
   494  			defer cancel()
   495  		}
   496  
   497  		c, err := s.Connection(checkoutContext)
   498  		if label, ok := operation["label"]; ok {
   499  			testInfo.Lock()
   500  			testInfo.objects[label.(string)] = c
   501  			testInfo.Unlock()
   502  		}
   503  
   504  		return err
   505  	case "checkIn":
   506  		cName, ok := operation["connection"]
   507  		if !ok {
   508  			t.Fatalf("unable to find connection to checkin")
   509  		}
   510  
   511  		var cEmptyInterface interface{}
   512  		testInfo.Lock()
   513  		cEmptyInterface, ok = testInfo.objects[cName.(string)]
   514  		delete(testInfo.objects, cName.(string))
   515  		testInfo.Unlock()
   516  		if !ok {
   517  			t.Fatalf("was unable to find %v in objects when expected", cName)
   518  		}
   519  
   520  		c, ok := cEmptyInterface.(*Connection)
   521  		if !ok {
   522  			t.Fatalf("object in objects was expected to be a connection, but was instead a %T", cEmptyInterface)
   523  		}
   524  		return c.Close()
   525  	case "clear":
   526  		needInterruption, ok := operation["interruptInUseConnections"].(bool)
   527  		if ok && needInterruption {
   528  			s.pool.clearAll(fmt.Errorf("spec test clear"), nil)
   529  		} else {
   530  			s.pool.clear(fmt.Errorf("spec test clear"), nil)
   531  		}
   532  	case "close":
   533  		s.pool.close(context.Background())
   534  	case "ready":
   535  		return s.pool.ready()
   536  	default:
   537  		t.Fatalf("unknown operation: %v", name)
   538  	}
   539  
   540  	return nil
   541  }
   542  

View as plain text