
Source file src/go.mongodb.org/mongo-driver/mongo/integration/sdam_error_handling_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     7  //go:build go1.13
     8  // +build go1.13
    10  package integration
    12  import (
    13  	"context"
    14  	"errors"
    15  	"fmt"
    16  	"testing"
    17  	"time"
    19  	"go.mongodb.org/mongo-driver/bson"
    20  	"go.mongodb.org/mongo-driver/internal/assert"
    21  	"go.mongodb.org/mongo-driver/internal/eventtest"
    22  	"go.mongodb.org/mongo-driver/mongo"
    23  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    24  	"go.mongodb.org/mongo-driver/mongo/options"
    25  )
    27  func TestSDAMErrorHandling(t *testing.T) {
    28  	mt := mtest.New(t, noClientOpts)
    29  	baseClientOpts := func() *options.ClientOptions {
    30  		return options.Client().
    31  			ApplyURI(mtest.ClusterURI()).
    32  			SetRetryWrites(false).
    33  			SetWriteConcern(mtest.MajorityWc)
    34  	}
    35  	baseMtOpts := func() *mtest.Options {
    36  		mtOpts := mtest.NewOptions().
    37  			Topologies(mtest.ReplicaSet, mtest.Single). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
    38  			MinServerVersion("4.0").                    // 4.0+ is required to use failpoints on replica sets.
    39  			ClientOptions(baseClientOpts())
    41  		if mtest.ClusterTopologyKind() == mtest.Sharded {
    42  			// Pin to a single mongos because the tests use failpoints.
    43  			mtOpts.ClientType(mtest.Pinned)
    44  		}
    45  		return mtOpts
    46  	}
    48  	// Set min server version of 4.4 because the during-handshake tests use failpoint features introduced in 4.4 like
    49  	// blockConnection and appName.
    50  	mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
    51  		mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
    52  			mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
    53  				// Assert that the pool is cleared when a connection created by an application
    54  				// operation thread encounters a timeout caused by socketTimeoutMS during
    55  				// handshaking.
    57  				appName := "authConnectTimeoutTest"
    58  				// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
    59  				// speculative auth.
    60  				mt.SetFailPoint(mtest.FailPoint{
    61  					ConfigureFailPoint: "failCommand",
    62  					Mode: mtest.FailPointMode{
    63  						Times: 1,
    64  					},
    65  					Data: mtest.FailPointData{
    66  						FailCommands:    []string{"saslContinue"},
    67  						BlockConnection: true,
    68  						BlockTimeMS:     150,
    69  						AppName:         appName,
    70  					},
    71  				})
    73  				// Reset the client with the appName specified in the failpoint and the pool monitor.
    74  				tpm := eventtest.NewTestPoolMonitor()
    75  				mt.ResetClient(baseClientOpts().
    76  					SetAppName(appName).
    77  					SetPoolMonitor(tpm.PoolMonitor).
    78  					// Set a 100ms socket timeout so that the saslContinue delay of 150ms causes a
    79  					// timeout during socket read (i.e. a timeout not caused by the InsertOne context).
    80  					SetSocketTimeout(100 * time.Millisecond))
    82  				// Use context.Background() so that the new connection will not time out due to an
    83  				// operation-scoped timeout.
    84  				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
    85  				assert.NotNil(mt, err, "expected InsertOne error, got nil")
    86  				assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
    87  				assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
    88  				// Assert that the pool is cleared within 2 seconds.
    89  				assert.Soon(mt, func(ctx context.Context) {
    90  					ticker := time.NewTicker(100 * time.Millisecond)
    91  					defer ticker.Stop()
    93  					for {
    94  						select {
    95  						case <-ticker.C:
    96  						case <-ctx.Done():
    97  							return
    98  						}
   100  						if tpm.IsPoolCleared() {
   101  							return
   102  						}
   103  					}
   104  				}, 2*time.Second)
   105  			})
   107  			mt.RunOpts("pool cleared on non-timeout network error", noClientOpts, func(mt *mtest.T) {
   108  				mt.Run("background", func(mt *mtest.T) {
   109  					// Assert that the pool is cleared when a connection created by the background pool maintenance
   110  					// routine encounters a non-timeout network error during handshaking.
   111  					appName := "authNetworkErrorTestBackground"
   113  					mt.SetFailPoint(mtest.FailPoint{
   114  						ConfigureFailPoint: "failCommand",
   115  						Mode: mtest.FailPointMode{
   116  							Times: 1,
   117  						},
   118  						Data: mtest.FailPointData{
   119  							FailCommands:    []string{"saslContinue"},
   120  							CloseConnection: true,
   121  							AppName:         appName,
   122  						},
   123  					})
   125  					// Reset the client with the appName specified in the failpoint.
   126  					tpm := eventtest.NewTestPoolMonitor()
   127  					mt.ResetClient(baseClientOpts().
   128  						SetAppName(appName).
   129  						SetPoolMonitor(tpm.PoolMonitor).
   130  						// Set minPoolSize to enable the background pool maintenance goroutine.
   131  						SetMinPoolSize(5))
   133  					// Assert that the pool is cleared within 2 seconds.
   134  					assert.Soon(mt, func(ctx context.Context) {
   135  						ticker := time.NewTicker(100 * time.Millisecond)
   136  						defer ticker.Stop()
   138  						for {
   139  							select {
   140  							case <-ticker.C:
   141  							case <-ctx.Done():
   142  								return
   143  							}
   145  							if tpm.IsPoolCleared() {
   146  								return
   147  							}
   148  						}
   149  					}, 2*time.Second)
   150  				})
   152  				mt.Run("foreground", func(mt *mtest.T) {
   153  					// Assert that the pool is cleared when a connection created by an application thread connection
   154  					// checkout encounters a non-timeout network error during handshaking.
   155  					appName := "authNetworkErrorTestForeground"
   157  					mt.SetFailPoint(mtest.FailPoint{
   158  						ConfigureFailPoint: "failCommand",
   159  						Mode: mtest.FailPointMode{
   160  							Times: 1,
   161  						},
   162  						Data: mtest.FailPointData{
   163  							FailCommands:    []string{"saslContinue"},
   164  							CloseConnection: true,
   165  							AppName:         appName,
   166  						},
   167  					})
   169  					// Reset the client with the appName specified in the failpoint.
   170  					tpm := eventtest.NewTestPoolMonitor()
   171  					mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
   173  					_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   174  					assert.NotNil(mt, err, "expected InsertOne error, got nil")
   175  					assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
   177  					// Assert that the pool is cleared within 2 seconds.
   178  					assert.Soon(mt, func(ctx context.Context) {
   179  						ticker := time.NewTicker(100 * time.Millisecond)
   180  						defer ticker.Stop()
   182  						for {
   183  							select {
   184  							case <-ticker.C:
   185  							case <-ctx.Done():
   186  								return
   187  							}
   189  							if tpm.IsPoolCleared() {
   190  								return
   191  							}
   192  						}
   193  					}, 2*time.Second)
   194  				})
   195  			})
   196  		})
   197  	})
   198  	mt.RunOpts("after handshake completes", baseMtOpts(), func(mt *mtest.T) {
   199  		mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
   200  			mt.Run("pool cleared on non-timeout network error", func(mt *mtest.T) {
   201  				appName := "afterHandshakeNetworkError"
   203  				mt.SetFailPoint(mtest.FailPoint{
   204  					ConfigureFailPoint: "failCommand",
   205  					Mode: mtest.FailPointMode{
   206  						Times: 1,
   207  					},
   208  					Data: mtest.FailPointData{
   209  						FailCommands:    []string{"insert"},
   210  						CloseConnection: true,
   211  						AppName:         appName,
   212  					},
   213  				})
   215  				// Reset the client with the appName specified in the failpoint.
   216  				tpm := eventtest.NewTestPoolMonitor()
   217  				mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
   219  				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
   220  				assert.NotNil(mt, err, "expected InsertOne error, got nil")
   221  				assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
   222  				assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
   223  			})
   224  			mt.Run("pool not cleared on timeout network error", func(mt *mtest.T) {
   225  				tpm := eventtest.NewTestPoolMonitor()
   226  				mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))
   228  				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   229  				assert.Nil(mt, err, "InsertOne error: %v", err)
   231  				filter := bson.M{
   232  					"$where": "function() { sleep(1000); return false; }",
   233  				}
   234  				timeoutCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
   235  				defer cancel()
   236  				_, err = mt.Coll.Find(timeoutCtx, filter)
   237  				assert.NotNil(mt, err, "expected Find error, got %v", err)
   238  				assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
   239  				assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
   240  			})
   241  			mt.Run("pool not cleared on context cancellation", func(mt *mtest.T) {
   242  				tpm := eventtest.NewTestPoolMonitor()
   243  				mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))
   245  				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   246  				assert.Nil(mt, err, "InsertOne error: %v", err)
   248  				findCtx, cancel := context.WithCancel(context.Background())
   249  				go func() {
   250  					time.Sleep(100 * time.Millisecond)
   251  					cancel()
   252  				}()
   254  				filter := bson.M{
   255  					"$where": "function() { sleep(1000); return false; }",
   256  				}
   257  				_, err = mt.Coll.Find(findCtx, filter)
   258  				assert.NotNil(mt, err, "expected Find error, got nil")
   259  				assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
   260  				assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
   261  				assert.True(mt, errors.Is(err, context.Canceled), "expected error %v to be context.Canceled", err)
   262  				assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
   263  			})
   264  		})
   265  		mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
   266  			// Integration tests for the SDAM error handling code path for errors in server response documents. These
   267  			// errors can be part of the top-level document in ok:0 responses or in a nested writeConcernError document.
   269  			// On 4.4, some state change errors include a topologyVersion field. Because we're triggering these errors
   270  			// via failCommand, the topologyVersion does not actually change as it would in an actual state change.
   271  			// This causes the SDAM error handling code path to think we've already handled this state change and
   272  			// ignore the error because it's stale. To avoid this altogether, we cap the test to <= 4.2.
   273  			serverErrorsMtOpts := baseMtOpts().
   274  				MinServerVersion("4.0"). // failCommand support
   275  				MaxServerVersion("4.2").
   276  				ClientOptions(baseClientOpts().SetRetryWrites(false))
   278  			testCases := []struct {
   279  				name      string
   280  				errorCode int32
   282  				// For shutdown errors, the pool is always cleared. For non-shutdown errors, the pool is only cleared
   283  				// for pre-4.2 servers.
   284  				isShutdownError bool
   285  			}{
   286  				// "node is recovering" errors
   287  				{"InterruptedAtShutdown", 11600, true},
   288  				{"InterruptedDueToReplStateChange, not shutdown", 11602, false},
   289  				{"NotPrimaryOrSecondary", 13436, false},
   290  				{"PrimarySteppedDown", 189, false},
   291  				{"ShutdownInProgress", 91, true},
   293  				// "not primary" errors
   294  				{"NotPrimary", 10107, false},
   295  				{"NotPrimaryNoSecondaryOk", 13435, false},
   296  			}
   297  			for _, tc := range testCases {
   298  				mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
   299  					appName := fmt.Sprintf("command_error_%s", tc.name)
   301  					// Cause the next insert to fail with an ok:0 response.
   302  					mt.SetFailPoint(mtest.FailPoint{
   303  						ConfigureFailPoint: "failCommand",
   304  						Mode: mtest.FailPointMode{
   305  							Times: 1,
   306  						},
   307  						Data: mtest.FailPointData{
   308  							FailCommands: []string{"insert"},
   309  							ErrorCode:    tc.errorCode,
   310  							AppName:      appName,
   311  						},
   312  					})
   314  					// Reset the client with the appName specified in the failpoint.
   315  					tpm := eventtest.NewTestPoolMonitor()
   316  					mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
   318  					runServerErrorsTest(mt, tc.isShutdownError, tpm)
   319  				})
   320  				mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
   321  					appName := fmt.Sprintf("write_concern_error_%s", tc.name)
   323  					// Cause the next insert to fail with a write concern error.
   324  					mt.SetFailPoint(mtest.FailPoint{
   325  						ConfigureFailPoint: "failCommand",
   326  						Mode: mtest.FailPointMode{
   327  							Times: 1,
   328  						},
   329  						Data: mtest.FailPointData{
   330  							FailCommands: []string{"insert"},
   331  							WriteConcernError: &mtest.WriteConcernErrorData{
   332  								Code: tc.errorCode,
   333  							},
   334  							AppName: appName,
   335  						},
   336  					})
   338  					// Reset the client with the appName specified in the failpoint.
   339  					tpm := eventtest.NewTestPoolMonitor()
   340  					mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
   342  					runServerErrorsTest(mt, tc.isShutdownError, tpm)
   343  				})
   344  			}
   345  		})
   346  	})
   347  }
   349  func runServerErrorsTest(mt *mtest.T, isShutdownError bool, tpm *eventtest.TestPoolMonitor) {
   350  	mt.Helper()
   352  	_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   353  	assert.NotNil(mt, err, "expected InsertOne error, got nil")
   355  	// The pool should always be cleared for shutdown errors, regardless of server version.
   356  	if isShutdownError {
   357  		assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared, but was not")
   358  		return
   359  	}
   361  	// For non-shutdown errors, the pool is only cleared if the error is from a pre-4.2 server.
   362  	wantCleared := mtest.CompareServerVersions(mtest.ServerVersion(), "4.2") < 0
   363  	gotCleared := tpm.IsPoolCleared()
   364  	assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %t; pool was cleared: %t",
   365  		wantCleared, gotCleared)
   366  }

View as plain text