...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/server_selection_prose_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2022-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"sync"
    12  	"testing"
    13  	"time"
    14  
    15  	"go.mongodb.org/mongo-driver/bson"
    16  	"go.mongodb.org/mongo-driver/event"
    17  	"go.mongodb.org/mongo-driver/internal/assert"
    18  	"go.mongodb.org/mongo-driver/internal/eventtest"
    19  	"go.mongodb.org/mongo-driver/internal/require"
    20  	"go.mongodb.org/mongo-driver/mongo/description"
    21  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    22  	"go.mongodb.org/mongo-driver/mongo/options"
    23  )
    24  
    25  type saturatedConnections map[uint64]bool
    26  
    27  // saturatedHosts is used to maintain information about events with specific host+pool combinations.
    28  type saturatedHosts map[string]saturatedConnections
    29  
    30  func (set saturatedHosts) add(host string, connectionID uint64) {
    31  	if set[host] == nil {
    32  		set[host] = make(saturatedConnections)
    33  	}
    34  	set[host][connectionID] = true
    35  }
    36  
    37  // isSaturated returns true when each client on the cluster URI has a tolerable number of ready connections.
    38  func (set saturatedHosts) isSaturated(tolerance uint64) bool {
    39  	for _, host := range options.Client().ApplyURI(mtest.ClusterURI()).Hosts {
    40  		if cxns := set[host]; cxns == nil || uint64(len(cxns)) < tolerance {
    41  			return false
    42  		}
    43  	}
    44  	return true
    45  }
    46  
    47  // awaitSaturation uses CMAP events to ensure that the client's connection pools for N-mongoses have been saturated.
    48  // The qualification for a host to be "saturated" is for each host on the client to have a tolerable number of ready
    49  // connections.
    50  func awaitSaturation(ctx context.Context, mt *mtest.T, monitor *eventtest.TestPoolMonitor, tolerance uint64) error {
    51  	set := make(saturatedHosts)
    52  	var err error
    53  	for !set.isSaturated(tolerance) {
    54  		if err = ctx.Err(); err != nil {
    55  			break
    56  		}
    57  		if err = mt.Coll.FindOne(ctx, bson.D{}).Err(); err != nil {
    58  			break
    59  		}
    60  		monitor.Events(func(evt *event.PoolEvent) bool {
    61  			// Add host only when the connection is ready for use.
    62  			if evt.Type == event.ConnectionReady {
    63  				set.add(evt.Address, evt.ConnectionID)
    64  			}
    65  			return true
    66  		})
    67  	}
    68  	return err
    69  }
    70  
    71  // runsServerSelection will run opCount-many `FindOne` operations within threadCount-many go routines.  The purpose of
    72  // this is to test the reliability of the server selection algorithm, which can be verified with the `counts` map and
    73  // `event.PoolEvent` slice.
    74  func runsServerSelection(mt *mtest.T, monitor *eventtest.TestPoolMonitor,
    75  	threadCount, opCount int) (map[string]int, []*event.PoolEvent) {
    76  	var wg sync.WaitGroup
    77  	for i := 0; i < threadCount; i++ {
    78  		wg.Add(1)
    79  		go func() {
    80  			defer wg.Done()
    81  
    82  			for i := 0; i < opCount; i++ {
    83  				res := mt.Coll.FindOne(context.Background(), bson.D{})
    84  				assert.NoError(mt.T, res.Err(), "FindOne() error for Collection '%s'", mt.Coll.Name())
    85  			}
    86  		}()
    87  	}
    88  	wg.Wait()
    89  
    90  	// Get all checkOut events and calculate the number of times each server was selected. The prose test spec says to
    91  	// use command monitoring events, but those don't include the server address, so use checkOut events instead.
    92  	checkOutEvents := monitor.Events(func(evt *event.PoolEvent) bool {
    93  		return evt.Type == event.GetStarted
    94  	})
    95  	counts := make(map[string]int)
    96  	for _, evt := range checkOutEvents {
    97  		counts[evt.Address]++
    98  	}
    99  	assert.Equal(mt, 2, len(counts), "expected exactly 2 server addresses")
   100  	return counts, checkOutEvents
   101  }
   102  
   103  // TestServerSelectionProse implements the Server Selection prose tests:
   104  // https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection-tests.rst
   105  func TestServerSelectionProse(t *testing.T) {
   106  	const maxPoolSize = 10
   107  	const localThreshold = 30 * time.Second
   108  
   109  	mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
   110  
   111  	mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.9")
   112  	mt.RunOpts("operationCount-based selection within latency window, with failpoint", mtOpts, func(mt *mtest.T) {
   113  		_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
   114  		require.NoError(mt, err, "InsertOne() error")
   115  
   116  		hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
   117  		require.GreaterOrEqualf(mt, len(hosts), 2, "test cluster must have at least 2 mongos hosts")
   118  
   119  		// Set a failpoint on a specific mongos host that delays all "find" commands for 500ms. We
   120  		// need to know which mongos we set the failpoint on for our assertions later.
   121  		failpointHost := hosts[0]
   122  		mt.ResetClient(options.Client().
   123  			SetHosts([]string{failpointHost}))
   124  		mt.SetFailPoint(mtest.FailPoint{
   125  			ConfigureFailPoint: "failCommand",
   126  			Mode: mtest.FailPointMode{
   127  				Times: 10000,
   128  			},
   129  			Data: mtest.FailPointData{
   130  				FailCommands:    []string{"find"},
   131  				BlockConnection: true,
   132  				BlockTimeMS:     500,
   133  				AppName:         "loadBalancingTest",
   134  			},
   135  		})
   136  		// The automatic failpoint clearing may not clear failpoints set on specific hosts, so
   137  		// manually clear the failpoint we set on the specific mongos when the test is done.
   138  		defer func() {
   139  			mt.ResetClient(options.Client().
   140  				SetHosts([]string{failpointHost}))
   141  			mt.ClearFailPoints()
   142  		}()
   143  
   144  		// Reset the client with exactly 2 mongos hosts. Use a ServerMonitor to wait for both mongos
   145  		// host descriptions to move from kind "Unknown" to kind "Mongos".
   146  		topologyEvents := make(chan *event.TopologyDescriptionChangedEvent, 10)
   147  		tpm := eventtest.NewTestPoolMonitor()
   148  		mt.ResetClient(options.Client().
   149  			SetLocalThreshold(localThreshold).
   150  			SetMaxPoolSize(maxPoolSize).
   151  			SetMinPoolSize(maxPoolSize).
   152  			SetHosts(hosts[:2]).
   153  			SetPoolMonitor(tpm.PoolMonitor).
   154  			SetAppName("loadBalancingTest").
   155  			SetServerMonitor(&event.ServerMonitor{
   156  				TopologyDescriptionChanged: func(evt *event.TopologyDescriptionChangedEvent) {
   157  					topologyEvents <- evt
   158  				},
   159  			}))
   160  		for evt := range topologyEvents {
   161  			servers := evt.NewDescription.Servers
   162  			if len(servers) == 2 && servers[0].Kind == description.Mongos && servers[1].Kind == description.Mongos {
   163  				break
   164  			}
   165  		}
   166  		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   167  		defer cancel()
   168  
   169  		if err := awaitSaturation(ctx, mt, tpm, maxPoolSize); err != nil {
   170  			mt.Fatalf("Error awaiting saturation: %v", err.Error())
   171  		}
   172  
   173  		counts, checkOutEvents := runsServerSelection(mt, tpm, 10, 10)
   174  		// Calculate the frequency that the server with the failpoint was selected. Assert that it
   175  		// was selected less than 25% of the time.
   176  		frequency := float64(counts[failpointHost]) / float64(len(checkOutEvents))
   177  		assert.Lessf(mt,
   178  			frequency,
   179  			0.25,
   180  			"expected failpoint host %q to be selected less than 25%% of the time",
   181  			failpointHost)
   182  	})
   183  
   184  	mtOpts = mtest.NewOptions().Topologies(mtest.Sharded)
   185  	mt.RunOpts("operationCount-based selection within latency window, no failpoint", mtOpts, func(mt *mtest.T) {
   186  		// TODO(GODRIVER-2842): Fix and unskip this test case.
   187  		mt.Skip("Test fails frequently, skipping. See GODRIVER-2842")
   188  
   189  		_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
   190  		require.NoError(mt, err, "InsertOne() error")
   191  
   192  		hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
   193  		require.GreaterOrEqualf(mt, len(hosts), 2, "test cluster must have at least 2 mongos hosts")
   194  
   195  		// Reset the client with exactly 2 mongos hosts. Use a ServerMonitor to wait for both mongos
   196  		// host descriptions to move from kind "Unknown" to kind "Mongos".
   197  		topologyEvents := make(chan *event.TopologyDescriptionChangedEvent, 10)
   198  		tpm := eventtest.NewTestPoolMonitor()
   199  		mt.ResetClient(options.Client().
   200  			SetHosts(hosts[:2]).
   201  			SetPoolMonitor(tpm.PoolMonitor).
   202  			SetLocalThreshold(localThreshold).
   203  			SetMaxPoolSize(maxPoolSize).
   204  			SetMinPoolSize(maxPoolSize).
   205  			SetServerMonitor(&event.ServerMonitor{
   206  				TopologyDescriptionChanged: func(evt *event.TopologyDescriptionChangedEvent) {
   207  					topologyEvents <- evt
   208  				},
   209  			}))
   210  		for evt := range topologyEvents {
   211  			servers := evt.NewDescription.Servers
   212  			if len(servers) == 2 && servers[0].Kind == description.Mongos && servers[1].Kind == description.Mongos {
   213  				break
   214  			}
   215  		}
   216  		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   217  		defer cancel()
   218  
   219  		if err := awaitSaturation(ctx, mt, tpm, maxPoolSize); err != nil {
   220  			mt.Fatalf("Error awaiting saturation: %v", err.Error())
   221  		}
   222  
   223  		counts, checkOutEvents := runsServerSelection(mt, tpm, 10, 100)
   224  		// Calculate the frequency that each server was selected. Assert that each server was
   225  		// selected 50% (+/- 10%) of the time.
   226  		for addr, count := range counts {
   227  			frequency := float64(count) / float64(len(checkOutEvents))
   228  			assert.InDeltaf(mt,
   229  				0.5,
   230  				frequency,
   231  				0.1,
   232  				"expected server %q to be selected 50%% (+/- 10%%) of the time, but was selected %v%% of the time",
   233  				addr, frequency*100)
   234  		}
   235  	})
   236  }
   237  

View as plain text