...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/primary_stepdown_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"testing"
    12  
    13  	"go.mongodb.org/mongo-driver/bson"
    14  	"go.mongodb.org/mongo-driver/internal/assert"
    15  	"go.mongodb.org/mongo-driver/internal/eventtest"
    16  	"go.mongodb.org/mongo-driver/mongo"
    17  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    18  	"go.mongodb.org/mongo-driver/mongo/options"
    19  )
    20  
    21  const (
    22  	errorNotPrimary            int32 = 10107
    23  	errorShutdownInProgress    int32 = 91
    24  	errorInterruptedAtShutdown int32 = 11600
    25  )
    26  
    27  func TestConnectionsSurvivePrimaryStepDown(t *testing.T) {
    28  	mt := mtest.New(t, mtest.NewOptions().Topologies(mtest.ReplicaSet).CreateClient(false))
    29  
    30  	getMoreOpts := mtest.NewOptions().MinServerVersion("4.2")
    31  	mt.RunOpts("getMore iteration", getMoreOpts, func(mt *mtest.T) {
    32  		tpm := eventtest.NewTestPoolMonitor()
    33  		mt.ResetClient(options.Client().
    34  			SetRetryWrites(false).
    35  			SetPoolMonitor(tpm.PoolMonitor))
    36  
    37  		initCollection(mt, mt.Coll)
    38  		cur, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
    39  		assert.Nil(mt, err, "Find error: %v", err)
    40  		defer cur.Close(context.Background())
    41  		assert.True(mt, cur.Next(context.Background()), "expected Next true, got false")
    42  
    43  		// replSetStepDown can fail with transient errors, so we use executeAdminCommandWithRetry to handle them and
    44  		// retry until a timeout is hit.
    45  		stepDownCmd := bson.D{
    46  			{"replSetStepDown", 5},
    47  			{"force", true},
    48  		}
    49  		stepDownOpts := options.RunCmd().SetReadPreference(mtest.PrimaryRp)
    50  		executeAdminCommandWithRetry(mt, mt.Client, stepDownCmd, stepDownOpts)
    51  
    52  		assert.True(mt, cur.Next(context.Background()), "expected Next true, got false")
    53  		assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
    54  	})
    55  	mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
    56  		testCases := []struct {
    57  			name                   string
    58  			minVersion, maxVersion string
    59  			errCode                int32
    60  			poolCleared            bool
    61  		}{
    62  			{"notPrimary keep pool", "4.2", "", errorNotPrimary, false},
    63  			{"notPrimary reset pool", "4.0", "4.0", errorNotPrimary, true},
    64  			{"shutdown in progress reset pool", "4.0", "", errorShutdownInProgress, true},
    65  			{"interrupted at shutdown reset pool", "4.0", "", errorInterruptedAtShutdown, true},
    66  		}
    67  		for _, tc := range testCases {
    68  			opts := mtest.NewOptions()
    69  			if tc.minVersion != "" {
    70  				opts.MinServerVersion(tc.minVersion)
    71  			}
    72  			if tc.maxVersion != "" {
    73  				opts.MaxServerVersion(tc.maxVersion)
    74  			}
    75  			mt.RunOpts(tc.name, opts, func(mt *mtest.T) {
    76  				tpm := eventtest.NewTestPoolMonitor()
    77  				mt.ResetClient(options.Client().
    78  					SetRetryWrites(false).
    79  					SetPoolMonitor(tpm.PoolMonitor).
    80  					// Use a low heartbeat frequency so the Client will quickly recover when using
    81  					// failpoints that cause SDAM state changes.
    82  					SetHeartbeatInterval(defaultHeartbeatInterval))
    83  
    84  				mt.SetFailPoint(mtest.FailPoint{
    85  					ConfigureFailPoint: "failCommand",
    86  					Mode: mtest.FailPointMode{
    87  						Times: 1,
    88  					},
    89  					Data: mtest.FailPointData{
    90  						FailCommands: []string{"insert"},
    91  						ErrorCode:    tc.errCode,
    92  					},
    93  				})
    94  
    95  				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
    96  				assert.NotNil(mt, err, "expected InsertOne error, got nil")
    97  				cerr, ok := err.(mongo.CommandError)
    98  				assert.True(mt, ok, "expected error type %v, got %v", mongo.CommandError{}, err)
    99  				assert.Equal(mt, tc.errCode, cerr.Code, "expected error code %v, got %v", tc.errCode, cerr.Code)
   100  
   101  				if tc.poolCleared {
   102  					assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
   103  					return
   104  				}
   105  
   106  				// if pool shouldn't be cleared, another operation should succeed
   107  				_, err = mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
   108  				assert.Nil(mt, err, "InsertOne error: %v", err)
   109  				assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
   110  			})
   111  		}
   112  	})
   113  }
   114  

View as plain text