...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/retryable_reads_prose_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2022-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"sync"
    12  	"testing"
    13  	"time"
    14  
    15  	"go.mongodb.org/mongo-driver/bson"
    16  	"go.mongodb.org/mongo-driver/event"
    17  	"go.mongodb.org/mongo-driver/internal/assert"
    18  	"go.mongodb.org/mongo-driver/internal/eventtest"
    19  	"go.mongodb.org/mongo-driver/internal/require"
    20  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    21  	"go.mongodb.org/mongo-driver/mongo/options"
    22  )
    23  
    24  func TestRetryableReadsProse(t *testing.T) {
    25  	tpm := eventtest.NewTestPoolMonitor()
    26  
    27  	// Client options with MaxPoolSize of 1 and RetryReads used per the test description.
    28  	// Lower HeartbeatInterval used to speed the test up for any server that uses streaming
    29  	// heartbeats. Only connect to first host in list for sharded clusters.
    30  	hosts := mtest.ClusterConnString().Hosts
    31  	clientOpts := options.Client().SetMaxPoolSize(1).SetRetryReads(true).
    32  		SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond).
    33  		SetHosts(hosts[:1])
    34  
    35  	mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("4.3")
    36  	mt := mtest.New(t, mtOpts)
    37  
    38  	mt.Run("PoolClearedError retryability", func(mt *mtest.T) {
    39  		if mtest.ClusterTopologyKind() == mtest.LoadBalanced {
    40  			mt.Skip("skipping as load balanced topology has different pool clearing behavior")
    41  		}
    42  
    43  		// Insert a document to test collection.
    44  		_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
    45  		assert.Nil(mt, err, "InsertOne error: %v", err)
    46  
    47  		// Force Find to block for 1 second once.
    48  		mt.SetFailPoint(mtest.FailPoint{
    49  			ConfigureFailPoint: "failCommand",
    50  			Mode: mtest.FailPointMode{
    51  				Times: 1,
    52  			},
    53  			Data: mtest.FailPointData{
    54  				FailCommands:    []string{"find"},
    55  				ErrorCode:       91,
    56  				BlockConnection: true,
    57  				BlockTimeMS:     1000,
    58  			},
    59  		})
    60  
    61  		// Clear CMAP and command events.
    62  		tpm.ClearEvents()
    63  		mt.ClearEvents()
    64  
    65  		// Perform a FindOne on two different threads and assert both operations are
    66  		// successful.
    67  		var wg sync.WaitGroup
    68  		for i := 0; i < 2; i++ {
    69  			wg.Add(1)
    70  			go func() {
    71  				defer wg.Done()
    72  				res := mt.Coll.FindOne(context.Background(), bson.D{})
    73  				assert.Nil(mt, res.Err())
    74  			}()
    75  		}
    76  		wg.Wait()
    77  
    78  		// Gather GetSucceeded, GetFailed and PoolCleared pool events.
    79  		events := tpm.Events(func(e *event.PoolEvent) bool {
    80  			getSucceeded := e.Type == event.GetSucceeded
    81  			getFailed := e.Type == event.GetFailed
    82  			poolCleared := e.Type == event.PoolCleared
    83  			return getSucceeded || getFailed || poolCleared
    84  		})
    85  
    86  		// Assert that first check out succeeds, pool is cleared, and second check
    87  		// out fails due to connection error.
    88  		assert.True(mt, len(events) >= 3, "expected at least 3 events, got %v", len(events))
    89  		assert.Equal(mt, event.GetSucceeded, events[0].Type,
    90  			"expected ConnectionCheckedOut event, got %v", events[0].Type)
    91  		assert.Equal(mt, event.PoolCleared, events[1].Type,
    92  			"expected ConnectionPoolCleared event, got %v", events[1].Type)
    93  		assert.Equal(mt, event.GetFailed, events[2].Type,
    94  			"expected ConnectionCheckedOutFailed event, got %v", events[2].Type)
    95  		assert.Equal(mt, event.ReasonConnectionErrored, events[2].Reason,
    96  			"expected check out failure due to connection error, failed due to %q", events[2].Reason)
    97  
    98  		// Assert that three find CommandStartedEvents were observed.
    99  		for i := 0; i < 3; i++ {
   100  			cmdEvt := mt.GetStartedEvent()
   101  			assert.NotNil(mt, cmdEvt, "expected a find event, got nil")
   102  			assert.Equal(mt, cmdEvt.CommandName, "find",
   103  				"expected a find event, got a(n) %v event", cmdEvt.CommandName)
   104  		}
   105  	})
   106  
   107  	mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2")
   108  	mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) {
   109  		tests := []struct {
   110  			name string
   111  
   112  			// Note that setting this value greater than 2 will result in false
   113  			// negatives. The current specification does not account for CSOT, which
   114  			// might allow for an "infinite" number of retries over a period of time.
   115  			// Because of this, we only track the "previous server".
   116  			hostCount            int
   117  			failpointErrorCode   int32
   118  			expectedFailCount    int
   119  			expectedSuccessCount int
   120  		}{
   121  			{
   122  				name:                 "retry on different mongos",
   123  				hostCount:            2,
   124  				failpointErrorCode:   6, // HostUnreachable
   125  				expectedFailCount:    2,
   126  				expectedSuccessCount: 0,
   127  			},
   128  			{
   129  				name:                 "retry on same mongos",
   130  				hostCount:            1,
   131  				failpointErrorCode:   6, // HostUnreachable
   132  				expectedFailCount:    1,
   133  				expectedSuccessCount: 1,
   134  			},
   135  		}
   136  
   137  		for _, tc := range tests {
   138  			mt.Run(tc.name, func(mt *mtest.T) {
   139  				hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
   140  				require.GreaterOrEqualf(mt, len(hosts), tc.hostCount,
   141  					"test cluster must have at least %v mongos hosts", tc.hostCount)
   142  
   143  				// Configure the failpoint options for each mongos.
   144  				failPoint := mtest.FailPoint{
   145  					ConfigureFailPoint: "failCommand",
   146  					Mode: mtest.FailPointMode{
   147  						Times: 1,
   148  					},
   149  					Data: mtest.FailPointData{
   150  						FailCommands:    []string{"find"},
   151  						ErrorCode:       tc.failpointErrorCode,
   152  						CloseConnection: false,
   153  					},
   154  				}
   155  
   156  				// In order to ensure that each mongos in the hostCount-many mongos
   157  				// hosts are tried at least once (i.e. failures are deprioritized), we
   158  				// set a failpoint on all mongos hosts. The idea is that if we get
   159  				// hostCount-many failures, then by the pigeonhole principal all mongos
   160  				// hosts must have been tried.
   161  				for i := 0; i < tc.hostCount; i++ {
   162  					mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
   163  					mt.SetFailPoint(failPoint)
   164  
   165  					// The automatic failpoint clearing may not clear failpoints set on
   166  					// specific hosts, so manually clear the failpoint we set on the
   167  					// specific mongos when the test is done.
   168  					defer mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
   169  					defer mt.ClearFailPoints()
   170  				}
   171  
   172  				failCount := 0
   173  				successCount := 0
   174  
   175  				commandMonitor := &event.CommandMonitor{
   176  					Failed: func(context.Context, *event.CommandFailedEvent) {
   177  						failCount++
   178  					},
   179  					Succeeded: func(context.Context, *event.CommandSucceededEvent) {
   180  						successCount++
   181  					},
   182  				}
   183  
   184  				// Reset the client with exactly hostCount-many mongos hosts.
   185  				mt.ResetClient(options.Client().
   186  					SetHosts(hosts[:tc.hostCount]).
   187  					SetRetryReads(true).
   188  					SetMonitor(commandMonitor))
   189  
   190  				mt.Coll.FindOne(context.Background(), bson.D{})
   191  
   192  				assert.Equal(mt, tc.expectedFailCount, failCount)
   193  				assert.Equal(mt, tc.expectedSuccessCount, successCount)
   194  			})
   195  		}
   196  	})
   197  }
   198  

View as plain text