...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/retryable_writes_prose_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"bytes"
    11  	"context"
    12  	"fmt"
    13  	"sync"
    14  	"testing"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson"
    18  	"go.mongodb.org/mongo-driver/bson/bsontype"
    19  	"go.mongodb.org/mongo-driver/event"
    20  	"go.mongodb.org/mongo-driver/internal/assert"
    21  	"go.mongodb.org/mongo-driver/internal/eventtest"
    22  	"go.mongodb.org/mongo-driver/internal/require"
    23  	"go.mongodb.org/mongo-driver/mongo"
    24  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    25  	"go.mongodb.org/mongo-driver/mongo/options"
    26  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    27  )
    28  
    29  func TestRetryableWritesProse(t *testing.T) {
    30  	clientOpts := options.Client().SetRetryWrites(true).SetWriteConcern(mtest.MajorityWc).
    31  		SetReadConcern(mtest.MajorityRc)
    32  	mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("3.6").CreateClient(false)
    33  	mt := mtest.New(t, mtOpts)
    34  
    35  	includeOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet, mtest.Sharded).CreateClient(false)
    36  	mt.RunOpts("txn number included", includeOpts, func(mt *mtest.T) {
    37  		updateDoc := bson.D{{"$inc", bson.D{{"x", 1}}}}
    38  		insertOneDoc := bson.D{{"x", 1}}
    39  		insertManyOrderedArgs := bson.D{
    40  			{"options", bson.D{{"ordered", true}}},
    41  			{"documents", []interface{}{insertOneDoc}},
    42  		}
    43  		insertManyUnorderedArgs := bson.D{
    44  			{"options", bson.D{{"ordered", true}}},
    45  			{"documents", []interface{}{insertOneDoc}},
    46  		}
    47  
    48  		testCases := []struct {
    49  			operationName   string
    50  			args            bson.D
    51  			expectTxnNumber bool
    52  		}{
    53  			{"deleteOne", bson.D{}, true},
    54  			{"deleteMany", bson.D{}, false},
    55  			{"updateOne", bson.D{{"update", updateDoc}}, true},
    56  			{"updateMany", bson.D{{"update", updateDoc}}, false},
    57  			{"replaceOne", bson.D{}, true},
    58  			{"insertOne", bson.D{{"document", insertOneDoc}}, true},
    59  			{"insertMany", insertManyOrderedArgs, true},
    60  			{"insertMany", insertManyUnorderedArgs, true},
    61  			{"findOneAndReplace", bson.D{}, true},
    62  			{"findOneAndUpdate", bson.D{{"update", updateDoc}}, true},
    63  			{"findOneAndDelete", bson.D{}, true},
    64  		}
    65  		for _, tc := range testCases {
    66  			mt.Run(tc.operationName, func(mt *mtest.T) {
    67  				tcArgs, err := bson.Marshal(tc.args)
    68  				assert.Nil(mt, err, "Marshal error: %v", err)
    69  				crudOp := crudOperation{
    70  					Name:      tc.operationName,
    71  					Arguments: tcArgs,
    72  				}
    73  
    74  				mt.ClearEvents()
    75  				runCrudOperation(mt, "", crudOp, crudOutcome{})
    76  				started := mt.GetStartedEvent()
    77  				assert.NotNil(mt, started, "expected CommandStartedEvent, got nil")
    78  				_, err = started.Command.LookupErr("txnNumber")
    79  				if tc.expectTxnNumber {
    80  					assert.Nil(mt, err, "expected txnNumber in command %v", started.Command)
    81  					return
    82  				}
    83  				assert.NotNil(mt, err, "did not expect txnNumber in command %v", started.Command)
    84  			})
    85  		}
    86  	})
    87  	errorOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet, mtest.Sharded)
    88  	mt.RunOpts("wrap mmapv1 error", errorOpts, func(mt *mtest.T) {
    89  		res, err := mt.DB.RunCommand(context.Background(), bson.D{{"serverStatus", 1}}).Raw()
    90  		assert.Nil(mt, err, "serverStatus error: %v", err)
    91  		storageEngine, ok := res.Lookup("storageEngine", "name").StringValueOK()
    92  		if !ok || storageEngine != "mmapv1" {
    93  			mt.Skip("skipping because storage engine is not mmapv1")
    94  		}
    95  
    96  		_, err = mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
    97  		assert.Equal(mt, driver.ErrUnsupportedStorageEngine, err,
    98  			"expected error %v, got %v", driver.ErrUnsupportedStorageEngine, err)
    99  	})
   100  
   101  	standaloneOpts := mtest.NewOptions().Topologies(mtest.Single).CreateClient(false)
   102  	mt.RunOpts("transaction number not sent on writes", standaloneOpts, func(mt *mtest.T) {
   103  		mt.Run("explicit session", func(mt *mtest.T) {
   104  			// Standalones do not support retryable writes and will error if a transaction number is sent
   105  
   106  			sess, err := mt.Client.StartSession()
   107  			assert.Nil(mt, err, "StartSession error: %v", err)
   108  			defer sess.EndSession(context.Background())
   109  
   110  			mt.ClearEvents()
   111  
   112  			err = mongo.WithSession(context.Background(), sess, func(ctx mongo.SessionContext) error {
   113  				doc := bson.D{{"foo", 1}}
   114  				_, err := mt.Coll.InsertOne(ctx, doc)
   115  				return err
   116  			})
   117  			assert.Nil(mt, err, "InsertOne error: %v", err)
   118  
   119  			_, wantID := sess.ID().Lookup("id").Binary()
   120  			command := mt.GetStartedEvent().Command
   121  			lsid, err := command.LookupErr("lsid")
   122  			assert.Nil(mt, err, "Error getting lsid: %v", err)
   123  			_, gotID := lsid.Document().Lookup("id").Binary()
   124  			assert.True(mt, bytes.Equal(wantID, gotID), "expected session ID %v, got %v", wantID, gotID)
   125  			txnNumber, err := command.LookupErr("txnNumber")
   126  			assert.NotNil(mt, err, "expected no txnNumber, got %v", txnNumber)
   127  		})
   128  		mt.Run("implicit session", func(mt *mtest.T) {
   129  			// Standalones do not support retryable writes and will error if a transaction number is sent
   130  
   131  			mt.ClearEvents()
   132  
   133  			doc := bson.D{{"foo", 1}}
   134  			_, err := mt.Coll.InsertOne(context.Background(), doc)
   135  			assert.Nil(mt, err, "InsertOne error: %v", err)
   136  
   137  			command := mt.GetStartedEvent().Command
   138  			lsid, err := command.LookupErr("lsid")
   139  			assert.Nil(mt, err, "Error getting lsid: %v", err)
   140  			_, gotID := lsid.Document().Lookup("id").Binary()
   141  			assert.NotNil(mt, gotID, "expected session ID, got nil")
   142  			txnNumber, err := command.LookupErr("txnNumber")
   143  			assert.NotNil(mt, err, "expected no txnNumber, got %v", txnNumber)
   144  		})
   145  	})
   146  
   147  	tpm := eventtest.NewTestPoolMonitor()
   148  	// Client options with MaxPoolSize of 1 and RetryWrites used per the test description.
   149  	// Lower HeartbeatInterval used to speed the test up for any server that uses streaming
   150  	// heartbeats. Only connect to first host in list for sharded clusters.
   151  	hosts := mtest.ClusterConnString().Hosts
   152  	pceOpts := options.Client().SetMaxPoolSize(1).SetRetryWrites(true).
   153  		SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond).
   154  		SetHosts(hosts[:1])
   155  
   156  	mtPceOpts := mtest.NewOptions().ClientOptions(pceOpts).MinServerVersion("4.3").
   157  		Topologies(mtest.ReplicaSet, mtest.Sharded)
   158  	mt.RunOpts("PoolClearedError retryability", mtPceOpts, func(mt *mtest.T) {
   159  		// Force Find to block for 1 second once.
   160  		mt.SetFailPoint(mtest.FailPoint{
   161  			ConfigureFailPoint: "failCommand",
   162  			Mode: mtest.FailPointMode{
   163  				Times: 1,
   164  			},
   165  			Data: mtest.FailPointData{
   166  				FailCommands:    []string{"insert"},
   167  				ErrorCode:       91,
   168  				BlockConnection: true,
   169  				BlockTimeMS:     1000,
   170  				ErrorLabels:     &[]string{"RetryableWriteError"},
   171  			},
   172  		})
   173  
   174  		// Clear CMAP and command events.
   175  		tpm.ClearEvents()
   176  		mt.ClearEvents()
   177  
   178  		// Perform an InsertOne on two different threads and assert both operations are
   179  		// successful.
   180  		var wg sync.WaitGroup
   181  		for i := 0; i < 2; i++ {
   182  			wg.Add(1)
   183  			go func() {
   184  				defer wg.Done()
   185  				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   186  				assert.Nil(mt, err, "InsertOne error: %v", err)
   187  			}()
   188  		}
   189  		wg.Wait()
   190  
   191  		// Gather GetSucceeded, GetFailed and PoolCleared pool events.
   192  		events := tpm.Events(func(e *event.PoolEvent) bool {
   193  			getSucceeded := e.Type == event.GetSucceeded
   194  			getFailed := e.Type == event.GetFailed
   195  			poolCleared := e.Type == event.PoolCleared
   196  			return getSucceeded || getFailed || poolCleared
   197  		})
   198  
   199  		// Assert that first check out succeeds, pool is cleared, and second check
   200  		// out fails due to connection error.
   201  		assert.True(mt, len(events) >= 3, "expected at least 3 events, got %v", len(events))
   202  		assert.Equal(mt, event.GetSucceeded, events[0].Type,
   203  			"expected ConnectionCheckedOut event, got %v", events[0].Type)
   204  		assert.Equal(mt, event.PoolCleared, events[1].Type,
   205  			"expected ConnectionPoolCleared event, got %v", events[1].Type)
   206  		assert.Equal(mt, event.GetFailed, events[2].Type,
   207  			"expected ConnectionCheckedOutFailed event, got %v", events[2].Type)
   208  		assert.Equal(mt, event.ReasonConnectionErrored, events[2].Reason,
   209  			"expected check out failure due to connection error, failed due to %q", events[2].Reason)
   210  
   211  		// Assert that three insert CommandStartedEvents were observed.
   212  		for i := 0; i < 3; i++ {
   213  			cmdEvt := mt.GetStartedEvent()
   214  			assert.NotNil(mt, cmdEvt, "expected an insert event, got nil")
   215  			assert.Equal(mt, cmdEvt.CommandName, "insert",
   216  				"expected an insert event, got a(n) %v event", cmdEvt.CommandName)
   217  		}
   218  	})
   219  
   220  	mtNWPOpts := mtest.NewOptions().MinServerVersion("6.0").Topologies(mtest.ReplicaSet)
   221  	mt.RunOpts(fmt.Sprintf("%s label returns original error", driver.NoWritesPerformed), mtNWPOpts,
   222  		func(mt *mtest.T) {
   223  			const shutdownInProgressErrorCode int32 = 91
   224  			const notWritablePrimaryErrorCode int32 = 10107
   225  
   226  			monitor := new(event.CommandMonitor)
   227  			mt.ResetClient(options.Client().SetRetryWrites(true).SetMonitor(monitor))
   228  
   229  			// Configure a fail point for a "ShutdownInProgress" error.
   230  			mt.SetFailPoint(mtest.FailPoint{
   231  				ConfigureFailPoint: "failCommand",
   232  				Mode:               mtest.FailPointMode{Times: 1},
   233  				Data: mtest.FailPointData{
   234  					WriteConcernError: &mtest.WriteConcernErrorData{
   235  						Code: shutdownInProgressErrorCode,
   236  					},
   237  					FailCommands: []string{"insert"},
   238  				},
   239  			})
   240  
   241  			// secondFailPointConfigured is used to determine if the conditions from the
   242  			// shutdownInProgressErrorCode actually configures the "NoWritablePrimary" fail command.
   243  			var secondFailPointConfigured bool
   244  
   245  			//Set a command monitor on the client that configures a failpoint with a "NoWritesPerformed"
   246  			monitor.Succeeded = func(_ context.Context, evt *event.CommandSucceededEvent) {
   247  				var errorCode int32
   248  				if wce := evt.Reply.Lookup("writeConcernError"); wce.Type == bsontype.EmbeddedDocument {
   249  					var ok bool
   250  					errorCode, ok = wce.Document().Lookup("code").Int32OK()
   251  					if !ok {
   252  						t.Fatalf("expected code to be an int32, got %v",
   253  							wce.Document().Lookup("code").Type)
   254  						return
   255  					}
   256  				}
   257  
   258  				// Do not set a fail point if event was not a writeConcernError with an error code for
   259  				// "ShutdownInProgress".
   260  				if errorCode != shutdownInProgressErrorCode {
   261  					return
   262  				}
   263  
   264  				mt.SetFailPoint(mtest.FailPoint{
   265  					ConfigureFailPoint: "failCommand",
   266  					Mode:               mtest.FailPointMode{Times: 1},
   267  					Data: mtest.FailPointData{
   268  						ErrorCode: notWritablePrimaryErrorCode,
   269  						ErrorLabels: &[]string{
   270  							driver.NoWritesPerformed,
   271  							driver.RetryableWriteError,
   272  						},
   273  						FailCommands: []string{"insert"},
   274  					},
   275  				})
   276  				secondFailPointConfigured = true
   277  			}
   278  
   279  			// Attempt to insert a document.
   280  			_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   281  
   282  			require.True(mt, secondFailPointConfigured)
   283  
   284  			// Assert that the "ShutdownInProgress" error is returned.
   285  			require.True(mt, err.(mongo.WriteException).HasErrorCode(int(shutdownInProgressErrorCode)))
   286  		})
   287  
   288  	mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2")
   289  	mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) {
   290  		tests := []struct {
   291  			name string
   292  
   293  			// Note that setting this value greater than 2 will result in false
   294  			// negatives. The current specification does not account for CSOT, which
   295  			// might allow for an "infinite" number of retries over a period of time.
   296  			// Because of this, we only track the "previous server".
   297  			hostCount            int
   298  			failpointErrorCode   int32
   299  			expectedFailCount    int
   300  			expectedSuccessCount int
   301  		}{
   302  			{
   303  				name:                 "retry on different mongos",
   304  				hostCount:            2,
   305  				failpointErrorCode:   6, // HostUnreachable
   306  				expectedFailCount:    2,
   307  				expectedSuccessCount: 0,
   308  			},
   309  			{
   310  				name:                 "retry on same mongos",
   311  				hostCount:            1,
   312  				failpointErrorCode:   6, // HostUnreachable
   313  				expectedFailCount:    1,
   314  				expectedSuccessCount: 1,
   315  			},
   316  		}
   317  
   318  		for _, tc := range tests {
   319  			mt.Run(tc.name, func(mt *mtest.T) {
   320  				hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
   321  				require.GreaterOrEqualf(mt, len(hosts), tc.hostCount,
   322  					"test cluster must have at least %v mongos hosts", tc.hostCount)
   323  
   324  				// Configure the failpoint options for each mongos.
   325  				failPoint := mtest.FailPoint{
   326  					ConfigureFailPoint: "failCommand",
   327  					Mode: mtest.FailPointMode{
   328  						Times: 1,
   329  					},
   330  					Data: mtest.FailPointData{
   331  						FailCommands:    []string{"insert"},
   332  						ErrorLabels:     &[]string{"RetryableWriteError"},
   333  						ErrorCode:       tc.failpointErrorCode,
   334  						CloseConnection: false,
   335  					},
   336  				}
   337  
   338  				// In order to ensure that each mongos in the hostCount-many mongos
   339  				// hosts are tried at least once (i.e. failures are deprioritized), we
   340  				// set a failpoint on all mongos hosts. The idea is that if we get
   341  				// hostCount-many failures, then by the pigeonhole principal all mongos
   342  				// hosts must have been tried.
   343  				for i := 0; i < tc.hostCount; i++ {
   344  					mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
   345  					mt.SetFailPoint(failPoint)
   346  
   347  					// The automatic failpoint clearing may not clear failpoints set on
   348  					// specific hosts, so manually clear the failpoint we set on the
   349  					// specific mongos when the test is done.
   350  					defer mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
   351  					defer mt.ClearFailPoints()
   352  				}
   353  
   354  				failCount := 0
   355  				successCount := 0
   356  
   357  				commandMonitor := &event.CommandMonitor{
   358  					Failed: func(context.Context, *event.CommandFailedEvent) {
   359  						failCount++
   360  					},
   361  					Succeeded: func(context.Context, *event.CommandSucceededEvent) {
   362  						successCount++
   363  					},
   364  				}
   365  
   366  				// Reset the client with exactly hostCount-many mongos hosts.
   367  				mt.ResetClient(options.Client().
   368  					SetHosts(hosts[:tc.hostCount]).
   369  					SetRetryWrites(true).
   370  					SetMonitor(commandMonitor))
   371  
   372  				_, _ = mt.Coll.InsertOne(context.Background(), bson.D{})
   373  
   374  				assert.Equal(mt, tc.expectedFailCount, failCount)
   375  				assert.Equal(mt, tc.expectedSuccessCount, successCount)
   376  			})
   377  		}
   378  	})
   379  }
   380  

View as plain text