...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/rtt_monitor_test.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/topology

     1  // Copyright (C) MongoDB, Inc. 2022-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package topology
     8  
     9  import (
    10  	"bytes"
    11  	"context"
    12  	"io"
    13  	"math"
    14  	"net"
    15  	"sync"
    16  	"sync/atomic"
    17  	"testing"
    18  	"time"
    19  
    20  	"go.mongodb.org/mongo-driver/internal/assert"
    21  	"go.mongodb.org/mongo-driver/internal/require"
    22  	"go.mongodb.org/mongo-driver/mongo/address"
    23  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    24  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    25  	"go.mongodb.org/mongo-driver/x/mongo/driver/drivertest"
    26  	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
    27  )
    28  
    29  func makeHelloReply() []byte {
    30  	doc := bsoncore.NewDocumentBuilder().AppendInt32("ok", 1).Build()
    31  	return drivertest.MakeReply(doc)
    32  }
    33  
    34  var _ net.Conn = &mockSlowConn{}
    35  
    36  type mockSlowConn struct {
    37  	reader *bytes.Reader
    38  	delay  time.Duration
    39  	closed atomic.Value
    40  }
    41  
    42  // newMockSlowConn returns a net.Conn that reads from the specified response after blocking for a
    43  // delay duration. Calls to Write() reset the read buffer, so subsequent Read() calls read from the
    44  // beginning of the provided response.
    45  func newMockSlowConn(response []byte, delay time.Duration) *mockSlowConn {
    46  	var closed atomic.Value
    47  	closed.Store(false)
    48  
    49  	return &mockSlowConn{
    50  		reader: bytes.NewReader(response),
    51  		delay:  delay,
    52  		closed: closed,
    53  	}
    54  }
    55  
    56  func (msc *mockSlowConn) Read(b []byte) (int, error) {
    57  	time.Sleep(msc.delay)
    58  	if msc.closed.Load().(bool) {
    59  		return 0, io.ErrUnexpectedEOF
    60  	}
    61  	return msc.reader.Read(b)
    62  }
    63  
    64  func (msc *mockSlowConn) Write(b []byte) (int, error) {
    65  	if msc.closed.Load().(bool) {
    66  		return 0, io.ErrUnexpectedEOF
    67  	}
    68  	_, err := msc.reader.Seek(0, io.SeekStart)
    69  	return len(b), err
    70  }
    71  
    72  // Close closes the mock connection. All subsequent calls to Read or Write return error
    73  // io.ErrUnexpectedEOF. It is not safe to call Close concurrently with Read or Write.
    74  func (msc *mockSlowConn) Close() error {
    75  	msc.closed.Store(true)
    76  	return nil
    77  }
    78  
    79  func (*mockSlowConn) LocalAddr() net.Addr                { return nil }
    80  func (*mockSlowConn) RemoteAddr() net.Addr               { return nil }
    81  func (*mockSlowConn) SetDeadline(_ time.Time) error      { return nil }
    82  func (*mockSlowConn) SetReadDeadline(_ time.Time) error  { return nil }
    83  func (*mockSlowConn) SetWriteDeadline(_ time.Time) error { return nil }
    84  
    85  func TestRTTMonitor(t *testing.T) {
    86  	t.Run("measures the average, minimum and 90th percentile RTT", func(t *testing.T) {
    87  		t.Parallel()
    88  
    89  		dialer := DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
    90  			return newMockSlowConn(makeHelloReply(), 10*time.Millisecond), nil
    91  		})
    92  		rtt := newRTTMonitor(&rttConfig{
    93  			interval: 10 * time.Millisecond,
    94  			createConnectionFn: func() *connection {
    95  				return newConnection("", WithDialer(func(Dialer) Dialer { return dialer }))
    96  			},
    97  			createOperationFn: func(conn driver.Connection) *operation.Hello {
    98  				return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
    99  			},
   100  		})
   101  		rtt.connect()
   102  		defer rtt.disconnect()
   103  
   104  		assert.Eventuallyf(
   105  			t,
   106  			func() bool { return rtt.EWMA() > 0 && rtt.Min() > 0 && rtt.P90() > 0 },
   107  			1*time.Second,
   108  			10*time.Millisecond,
   109  			"expected EWMA(), Min() and P90() to return positive durations within 1 second")
   110  		assert.True(
   111  			t,
   112  			rtt.EWMA() > 0,
   113  			"expected EWMA() to return a positive duration, got %v",
   114  			rtt.EWMA())
   115  		assert.True(
   116  			t,
   117  			rtt.Min() > 0,
   118  			"expected Min() to return a positive duration, got %v",
   119  			rtt.Min())
   120  		assert.True(
   121  			t,
   122  			rtt.P90() > 0,
   123  			"expected P90() to return a positive duration, got %v",
   124  			rtt.P90())
   125  	})
   126  
   127  	t.Run("creates the correct size samples slice", func(t *testing.T) {
   128  		t.Parallel()
   129  
   130  		cases := []struct {
   131  			desc           string
   132  			interval       time.Duration
   133  			wantSamplesLen int
   134  		}{
   135  			{
   136  				desc:           "default",
   137  				interval:       10 * time.Second,
   138  				wantSamplesLen: 30,
   139  			},
   140  			{
   141  				desc:           "min",
   142  				interval:       10 * time.Minute,
   143  				wantSamplesLen: 10,
   144  			},
   145  			{
   146  				desc:           "max",
   147  				interval:       1 * time.Millisecond,
   148  				wantSamplesLen: 500,
   149  			},
   150  		}
   151  		for _, tc := range cases {
   152  			t.Run(tc.desc, func(t *testing.T) {
   153  				rtt := newRTTMonitor(&rttConfig{
   154  					interval:     tc.interval,
   155  					minRTTWindow: 5 * time.Minute,
   156  				})
   157  				assert.Equal(t, tc.wantSamplesLen, len(rtt.samples), "expected samples length to match")
   158  			})
   159  		}
   160  	})
   161  
   162  	t.Run("can connect and disconnect repeatedly", func(t *testing.T) {
   163  		t.Parallel()
   164  
   165  		dialer := DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
   166  			return newMockSlowConn(makeHelloReply(), 10*time.Millisecond), nil
   167  		})
   168  		rtt := newRTTMonitor(&rttConfig{
   169  			interval: 10 * time.Second,
   170  			createConnectionFn: func() *connection {
   171  				return newConnection("", WithDialer(func(Dialer) Dialer {
   172  					return dialer
   173  				}))
   174  			},
   175  			createOperationFn: func(conn driver.Connection) *operation.Hello {
   176  				return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
   177  			},
   178  		})
   179  		for i := 0; i < 100; i++ {
   180  			rtt.connect()
   181  			rtt.disconnect()
   182  		}
   183  	})
   184  
   185  	t.Run("works after reset", func(t *testing.T) {
   186  		t.Parallel()
   187  
   188  		dialer := DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
   189  			return newMockSlowConn(makeHelloReply(), 10*time.Millisecond), nil
   190  		})
   191  		rtt := newRTTMonitor(&rttConfig{
   192  			interval: 10 * time.Millisecond,
   193  			createConnectionFn: func() *connection {
   194  				return newConnection("", WithDialer(func(Dialer) Dialer { return dialer }))
   195  			},
   196  			createOperationFn: func(conn driver.Connection) *operation.Hello {
   197  				return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
   198  			},
   199  		})
   200  		rtt.connect()
   201  		defer rtt.disconnect()
   202  
   203  		for i := 0; i < 3; i++ {
   204  			assert.Eventuallyf(
   205  				t,
   206  				func() bool { return rtt.EWMA() > 0 },
   207  				1*time.Second,
   208  				10*time.Millisecond,
   209  				"expected EWMA() to return a positive duration within 1 second")
   210  			assert.Eventuallyf(
   211  				t,
   212  				func() bool { return rtt.Min() > 0 },
   213  				1*time.Second,
   214  				10*time.Millisecond,
   215  				"expected Min() to return a positive duration within 1 second")
   216  			assert.Eventuallyf(
   217  				t,
   218  				func() bool { return rtt.P90() > 0 },
   219  				1*time.Second,
   220  				10*time.Millisecond,
   221  				"expected P90() to return a positive duration within 1 second")
   222  			rtt.reset()
   223  		}
   224  	})
   225  
   226  	// GODRIVER-2464
   227  	// Test that the RTT monitor can continue monitoring server RTTs after an operation gets stuck.
   228  	// An operation can get stuck if the server or a proxy stops responding to requests on the RTT
   229  	// connection but does not close the TCP socket, effectively creating an operation that will
   230  	// never complete.
   231  	t.Run("stuck operations time out", func(t *testing.T) {
   232  		t.Parallel()
   233  
   234  		// Start a goroutine that listens for and accepts TCP connections, reads requests, and
   235  		// responds with {"ok": 1}. The first 2 connections simulate "stuck" connections and never
   236  		// respond or close.
   237  		l, err := net.Listen("tcp", "localhost:0")
   238  		require.NoError(t, err)
   239  		var wg sync.WaitGroup
   240  		wg.Add(1)
   241  		go func() {
   242  			defer wg.Done()
   243  
   244  			for i := 0; ; i++ {
   245  				conn, err := l.Accept()
   246  				if err != nil {
   247  					// The listen loop is cancelled by closing the listener, so there will always be
   248  					// an error here. Log the error to make debugging easier in case of unexpected
   249  					// errors.
   250  					t.Logf("Accept error: %v", err)
   251  					return
   252  				}
   253  
   254  				// Only close connections when the listener loop returns to prevent closing "stuck"
   255  				// connections while the test is running.
   256  				defer conn.Close()
   257  
   258  				wg.Add(1)
   259  				go func(i int) {
   260  					defer wg.Done()
   261  
   262  					buf := make([]byte, 256)
   263  					for {
   264  						if _, err := conn.Read(buf); err != nil {
   265  							// The connection read/write loop is cancelled by closing the connection,
   266  							// so may be an expected error here. Log the error to make debugging
   267  							// easier in case of unexpected errors.
   268  							t.Logf("Read error: %v", err)
   269  							return
   270  						}
   271  
   272  						// For the first 2 connections, read the request but never respond and don't
   273  						// close the connection. That simulates the behavior of a "stuck" connection.
   274  						if i < 2 {
   275  							return
   276  						}
   277  
   278  						// Delay for 10ms so that systems with limited timing granularity (e.g. some
   279  						// older versions of Windows) can measure a non-zero latency.
   280  						time.Sleep(10 * time.Millisecond)
   281  
   282  						if _, err := conn.Write(makeHelloReply()); err != nil {
   283  							// The connection read/write loop is cancelled by closing the connection,
   284  							// so may be an expected error here. Log the error to make debugging
   285  							// easier in case of unexpected errors.
   286  							t.Logf("Write error: %v", err)
   287  							return
   288  						}
   289  					}
   290  				}(i)
   291  			}
   292  		}()
   293  
   294  		rtt := newRTTMonitor(&rttConfig{
   295  			interval: 10 * time.Millisecond,
   296  			timeout:  100 * time.Millisecond,
   297  			createConnectionFn: func() *connection {
   298  				return newConnection(address.Address(l.Addr().String()))
   299  			},
   300  			createOperationFn: func(conn driver.Connection) *operation.Hello {
   301  				return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
   302  			},
   303  		})
   304  		rtt.connect()
   305  
   306  		assert.Eventuallyf(
   307  			t,
   308  			func() bool { return rtt.EWMA() > 0 },
   309  			1*time.Second,
   310  			10*time.Millisecond,
   311  			"expected EWMA() to return a positive duration within 1 second")
   312  		assert.Eventuallyf(
   313  			t,
   314  			func() bool { return rtt.Min() > 0 },
   315  			1*time.Second,
   316  			10*time.Millisecond,
   317  			"expected Min() to return a positive duration within 1 second")
   318  		assert.Eventuallyf(
   319  			t,
   320  			func() bool { return rtt.P90() > 0 },
   321  			1*time.Second,
   322  			10*time.Millisecond,
   323  			"expected P90() to return a positive duration within 1 second")
   324  
   325  		rtt.disconnect()
   326  		l.Close()
   327  		wg.Wait()
   328  	})
   329  }
   330  
   331  func TestMin(t *testing.T) {
   332  	cases := []struct {
   333  		desc       string
   334  		samples    []time.Duration
   335  		minSamples int
   336  		want       time.Duration
   337  	}{
   338  		{
   339  			desc:       "Should return the min for minSamples = 0",
   340  			samples:    []time.Duration{1, 0, 0, 0},
   341  			minSamples: 0,
   342  			want:       1,
   343  		},
   344  		{
   345  			desc:       "Should return 0 for fewer than minSamples samples",
   346  			samples:    []time.Duration{1, 0, 0, 0},
   347  			minSamples: 2,
   348  			want:       0,
   349  		},
   350  		{
   351  			desc:       "Should return 0 for empty samples slice",
   352  			samples:    []time.Duration{},
   353  			minSamples: 0,
   354  			want:       0,
   355  		},
   356  		{
   357  			desc:       "Should return 0 for no valid samples",
   358  			samples:    []time.Duration{0, 0, 0},
   359  			minSamples: 0,
   360  			want:       0,
   361  		},
   362  		{
   363  			desc:       "Should return max int64 if all samples are max int64",
   364  			samples:    []time.Duration{math.MaxInt64, math.MaxInt64, math.MaxInt64},
   365  			minSamples: 0,
   366  			want:       math.MaxInt64,
   367  		},
   368  		{
   369  			desc:       "Should return the minimum if there are enough samples",
   370  			samples:    []time.Duration{1 * time.Second, 100 * time.Millisecond, 150 * time.Millisecond, 0, 0, 0},
   371  			minSamples: 3,
   372  			want:       100 * time.Millisecond,
   373  		},
   374  		{
   375  			desc:       "Should return 0 if there are are not enough samples",
   376  			samples:    []time.Duration{1 * time.Second, 100 * time.Millisecond, 0, 0, 0, 0},
   377  			minSamples: 3,
   378  			want:       0,
   379  		},
   380  	}
   381  
   382  	for _, tc := range cases {
   383  		tc := tc
   384  		t.Run(tc.desc, func(t *testing.T) {
   385  			t.Parallel()
   386  
   387  			got := min(tc.samples, tc.minSamples)
   388  			assert.Equal(t, tc.want, got, "unexpected result from min()")
   389  		})
   390  	}
   391  }
   392  
   393  func TestPercentile(t *testing.T) {
   394  	cases := []struct {
   395  		desc       string
   396  		samples    []time.Duration
   397  		minSamples int
   398  		percentile float64
   399  		want       time.Duration
   400  	}{
   401  		{
   402  			desc:       "Should return 0 for fewer than minSamples samples",
   403  			samples:    []time.Duration{1, 0, 0, 0},
   404  			minSamples: 2,
   405  			percentile: 90.0,
   406  			want:       0,
   407  		},
   408  		{
   409  			desc:       "Should return 0 for empty samples slice",
   410  			samples:    []time.Duration{},
   411  			minSamples: 0,
   412  			percentile: 90.0,
   413  			want:       0,
   414  		},
   415  		{
   416  			desc:       "Should return 0 for no valid samples",
   417  			samples:    []time.Duration{0, 0, 0},
   418  			minSamples: 0,
   419  			percentile: 90.0,
   420  			want:       0,
   421  		},
   422  		{
   423  			desc:       "First tertile when minSamples = 0",
   424  			samples:    []time.Duration{1, 2, 3, 0, 0, 0},
   425  			minSamples: 0,
   426  			percentile: 33.34,
   427  			want:       1,
   428  		},
   429  		{
   430  			desc: "90th percentile when there are enough samples",
   431  			samples: []time.Duration{
   432  				100 * time.Millisecond,
   433  				200 * time.Millisecond,
   434  				300 * time.Millisecond,
   435  				400 * time.Millisecond,
   436  				500 * time.Millisecond,
   437  				600 * time.Millisecond,
   438  				700 * time.Millisecond,
   439  				800 * time.Millisecond,
   440  				900 * time.Millisecond,
   441  				1 * time.Second,
   442  				0, 0, 0},
   443  			minSamples: 10,
   444  			percentile: 90.0,
   445  			want:       900 * time.Millisecond,
   446  		},
   447  		{
   448  			desc: "10th percentile when there are enough samples",
   449  			samples: []time.Duration{
   450  				100 * time.Millisecond,
   451  				200 * time.Millisecond,
   452  				300 * time.Millisecond,
   453  				400 * time.Millisecond,
   454  				500 * time.Millisecond,
   455  				600 * time.Millisecond,
   456  				700 * time.Millisecond,
   457  				800 * time.Millisecond,
   458  				900 * time.Millisecond,
   459  				1 * time.Second,
   460  				0, 0, 0},
   461  			minSamples: 10,
   462  			percentile: 10.0,
   463  			want:       100 * time.Millisecond,
   464  		},
   465  	}
   466  
   467  	for _, tc := range cases {
   468  		tc := tc
   469  		t.Run(tc.desc, func(t *testing.T) {
   470  			t.Parallel()
   471  
   472  			got := percentile(tc.percentile, tc.samples, tc.minSamples)
   473  			assert.Equal(t, tc.want, got, "unexpected result from percentile()")
   474  		})
   475  	}
   476  }
   477  

View as plain text