...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/sdam_prose_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"net"
    12  	"os"
    13  	"runtime"
    14  	"testing"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson/primitive"
    18  	"go.mongodb.org/mongo-driver/event"
    19  	"go.mongodb.org/mongo-driver/internal/assert"
    20  	"go.mongodb.org/mongo-driver/internal/handshake"
    21  	"go.mongodb.org/mongo-driver/internal/require"
    22  	"go.mongodb.org/mongo-driver/mongo/address"
    23  	"go.mongodb.org/mongo-driver/mongo/description"
    24  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    25  	"go.mongodb.org/mongo-driver/mongo/options"
    26  	"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
    27  )
    28  
    29  func TestSDAMProse(t *testing.T) {
    30  	mt := mtest.New(t)
    31  
    32  	// Server limits non-streaming heartbeats and explicit server transition checks to at most one
    33  	// per 500ms. Set the test interval to 500ms to minimize the difference between the behavior of
    34  	// streaming and non-streaming heartbeat intervals.
    35  	heartbeatInterval := 500 * time.Millisecond
    36  	heartbeatIntervalClientOpts := options.Client().
    37  		SetHeartbeatInterval(heartbeatInterval)
    38  	heartbeatIntervalMtOpts := mtest.NewOptions().
    39  		ClientOptions(heartbeatIntervalClientOpts).
    40  		CreateCollection(false).
    41  		ClientType(mtest.Proxy).
    42  		MinServerVersion("4.4") // RTT Monitor / Streaming protocol is not supported for versions < 4.4.
    43  	mt.RunOpts("heartbeats processed more frequently", heartbeatIntervalMtOpts, func(mt *mtest.T) {
    44  		// Test that setting heartbeat interval to 500ms causes the client to process heartbeats
    45  		// approximately every 500ms instead of the default 10s. Note that a Client doesn't
    46  		// guarantee that it will process heartbeats exactly every 500ms, just that it will wait at
    47  		// least 500ms between heartbeats (and should process heartbeats more frequently for shorter
    48  		// interval settings).
    49  		//
    50  		// For number of nodes N, interval I, and duration D, a Client should process at most X
    51  		// operations:
    52  		//
    53  		//   X = (N * (2 handshakes + D/I heartbeats + D/I RTTs))
    54  		//
    55  		// Assert that a Client processes the expected number of operations for heartbeats sent at
    56  		// an interval between I and 2*I to account for different actual heartbeat intervals under
    57  		// different runtime conditions.
    58  
    59  		// Measure the actual amount of time between the start of the test and when we inspect the
    60  		// sent messages. The sleep duration will be at least the specified duration but
    61  		// possibly longer, which could lead to extra heartbeat messages, so account for that in
    62  		// the assertions.
    63  		if len(os.Getenv("DOCKER_RUNNING")) > 0 {
    64  			mt.Skip("skipping test in docker environment")
    65  		}
    66  		start := time.Now()
    67  		time.Sleep(2 * time.Second)
    68  		messages := mt.GetProxiedMessages()
    69  		duration := time.Since(start)
    70  
    71  		numNodes := len(options.Client().ApplyURI(mtest.ClusterURI()).Hosts)
    72  		maxExpected := numNodes * (2 + 2*int(duration/heartbeatInterval))
    73  		minExpected := numNodes * (2 + 2*int(duration/(heartbeatInterval*2)))
    74  
    75  		assert.True(
    76  			mt,
    77  			len(messages) >= minExpected && len(messages) <= maxExpected,
    78  			"expected number of messages to be in range [%d, %d], got %d"+
    79  				" (num nodes = %d, duration = %v, interval = %v)",
    80  			minExpected,
    81  			maxExpected,
    82  			len(messages),
    83  			numNodes,
    84  			duration,
    85  			heartbeatInterval)
    86  	})
    87  
    88  	mt.RunOpts("rtt tests", noClientOpts, func(mt *mtest.T) {
    89  		clientOpts := options.Client().
    90  			SetHeartbeatInterval(500 * time.Millisecond).
    91  			SetAppName("streamingRttTest")
    92  		mtOpts := mtest.NewOptions().
    93  			MinServerVersion("4.4").
    94  			ClientOptions(clientOpts)
    95  		mt.RunOpts("rtt is continuously updated", mtOpts, func(mt *mtest.T) {
    96  			// Test that the RTT monitor updates the RTT for server descriptions.
    97  
    98  			// The server has been discovered by the create command issued by mtest. Sleep for two seconds to allow
    99  			// multiple heartbeats to finish.
   100  			testTopology := getTopologyFromClient(mt.Client)
   101  			time.Sleep(2 * time.Second)
   102  			for _, serverDesc := range testTopology.Description().Servers {
   103  				assert.NotEqual(mt, description.Unknown, serverDesc.Kind, "server %v is Unknown", serverDesc)
   104  				assert.True(mt, serverDesc.AverageRTTSet, "AverageRTTSet for server description %v is false", serverDesc)
   105  
   106  				if runtime.GOOS != "windows" {
   107  					// Windows has a lower time resolution than other platforms, which causes the reported RTT to be
   108  					// 0 if it's below some threshold. The assertion above already confirms that the RTT is set to
   109  					// a value, so we can skip this assertion on Windows.
   110  					assert.True(mt, serverDesc.AverageRTT > 0, "server description %v has 0 RTT", serverDesc)
   111  				}
   112  			}
   113  
   114  			// Force hello requests to block for 500ms and wait until a server's average RTT goes over 250ms.
   115  			mt.SetFailPoint(mtest.FailPoint{
   116  				ConfigureFailPoint: "failCommand",
   117  				Mode: mtest.FailPointMode{
   118  					Times: 1000,
   119  				},
   120  				Data: mtest.FailPointData{
   121  					FailCommands:    []string{handshake.LegacyHello, "hello"},
   122  					BlockConnection: true,
   123  					BlockTimeMS:     500,
   124  					AppName:         "streamingRttTest",
   125  				},
   126  			})
   127  			callback := func(ctx context.Context) {
   128  				for {
   129  					// Stop loop if callback has been canceled.
   130  					select {
   131  					case <-ctx.Done():
   132  						return
   133  					default:
   134  					}
   135  
   136  					// We don't know which server received the failpoint command, so we wait until any of the server
   137  					// RTTs cross the threshold.
   138  					for _, serverDesc := range testTopology.Description().Servers {
   139  						if serverDesc.AverageRTT > 250*time.Millisecond {
   140  							return
   141  						}
   142  					}
   143  
   144  					// The next update will be in ~500ms.
   145  					time.Sleep(500 * time.Millisecond)
   146  				}
   147  			}
   148  			assert.Soon(t, callback, defaultCallbackTimeout)
   149  		})
   150  	})
   151  
   152  	mt.RunOpts("client waits between failed Hellos", mtest.NewOptions().MinServerVersion("4.9").Topologies(mtest.Single), func(mt *mtest.T) {
   153  		// Force hello requests to fail 5 times.
   154  		mt.SetFailPoint(mtest.FailPoint{
   155  			ConfigureFailPoint: "failCommand",
   156  			Mode: mtest.FailPointMode{
   157  				Times: 5,
   158  			},
   159  			Data: mtest.FailPointData{
   160  				FailCommands: []string{handshake.LegacyHello, "hello"},
   161  				ErrorCode:    1234,
   162  				AppName:      "SDAMMinHeartbeatFrequencyTest",
   163  			},
   164  		})
   165  
   166  		// Reset client options to use direct connection, app name, and 5s SS timeout.
   167  		clientOpts := options.Client().SetDirect(true).
   168  			SetAppName("SDAMMinHeartbeatFrequencyTest").
   169  			SetServerSelectionTimeout(5 * time.Second)
   170  		mt.ResetClient(clientOpts)
   171  
   172  		// Assert that Ping completes successfully within 2 to 3.5 seconds.
   173  		start := time.Now()
   174  		err := mt.Client.Ping(context.Background(), nil)
   175  		assert.Nil(mt, err, "Ping error: %v", err)
   176  		pingTime := time.Since(start)
   177  		assert.True(mt, pingTime > 2000*time.Millisecond && pingTime < 3500*time.Millisecond,
   178  			"expected Ping to take between 2 and 3.5 seconds, took %v seconds", pingTime.Seconds())
   179  
   180  	})
   181  }
   182  
   183  func TestServerHeartbeatStartedEvent(t *testing.T) {
   184  	t.Run("emits the first HeartbeatStartedEvent before the monitoring socket was created", func(t *testing.T) {
   185  		t.Parallel()
   186  
   187  		const address = address.Address("localhost:9999")
   188  		expectedEvents := []string{
   189  			"serverHeartbeatStartedEvent",
   190  			"client connected",
   191  			"client hello received",
   192  			"serverHeartbeatFailedEvent",
   193  		}
   194  
   195  		events := make(chan string)
   196  
   197  		listener, err := net.Listen("tcp", address.String())
   198  		assert.NoError(t, err)
   199  		defer listener.Close()
   200  		go func() {
   201  			conn, err := listener.Accept()
   202  			assert.NoError(t, err)
   203  			defer conn.Close()
   204  
   205  			events <- "client connected"
   206  			_, _ = conn.Read(nil)
   207  			events <- "client hello received"
   208  		}()
   209  
   210  		server := topology.NewServer(
   211  			address,
   212  			primitive.NewObjectID(),
   213  			topology.WithServerMonitor(func(*event.ServerMonitor) *event.ServerMonitor {
   214  				return &event.ServerMonitor{
   215  					ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
   216  						events <- "serverHeartbeatStartedEvent"
   217  					},
   218  					ServerHeartbeatFailed: func(e *event.ServerHeartbeatFailedEvent) {
   219  						events <- "serverHeartbeatFailedEvent"
   220  					},
   221  				}
   222  			}),
   223  		)
   224  		require.NoError(t, server.Connect(nil))
   225  
   226  		ticker := time.NewTicker(5 * time.Second)
   227  		defer ticker.Stop()
   228  
   229  		actualEvents := make([]string, 0, len(expectedEvents))
   230  		for len(actualEvents) < len(expectedEvents) {
   231  			select {
   232  			case event := <-events:
   233  				actualEvents = append(actualEvents, event)
   234  			case <-ticker.C:
   235  				assert.FailNow(t, "timed out for incoming event")
   236  			}
   237  		}
   238  		assert.Equal(t, expectedEvents, actualEvents)
   239  	})
   240  }
   241  

View as plain text