1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "testing"
12
13 "go.mongodb.org/mongo-driver/bson"
14 "go.mongodb.org/mongo-driver/internal/assert"
15 "go.mongodb.org/mongo-driver/internal/eventtest"
16 "go.mongodb.org/mongo-driver/mongo"
17 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
18 "go.mongodb.org/mongo-driver/mongo/options"
19 )
20
21 const (
22 errorNotPrimary int32 = 10107
23 errorShutdownInProgress int32 = 91
24 errorInterruptedAtShutdown int32 = 11600
25 )
26
27 func TestConnectionsSurvivePrimaryStepDown(t *testing.T) {
28 mt := mtest.New(t, mtest.NewOptions().Topologies(mtest.ReplicaSet).CreateClient(false))
29
30 getMoreOpts := mtest.NewOptions().MinServerVersion("4.2")
31 mt.RunOpts("getMore iteration", getMoreOpts, func(mt *mtest.T) {
32 tpm := eventtest.NewTestPoolMonitor()
33 mt.ResetClient(options.Client().
34 SetRetryWrites(false).
35 SetPoolMonitor(tpm.PoolMonitor))
36
37 initCollection(mt, mt.Coll)
38 cur, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
39 assert.Nil(mt, err, "Find error: %v", err)
40 defer cur.Close(context.Background())
41 assert.True(mt, cur.Next(context.Background()), "expected Next true, got false")
42
43
44
45 stepDownCmd := bson.D{
46 {"replSetStepDown", 5},
47 {"force", true},
48 }
49 stepDownOpts := options.RunCmd().SetReadPreference(mtest.PrimaryRp)
50 executeAdminCommandWithRetry(mt, mt.Client, stepDownCmd, stepDownOpts)
51
52 assert.True(mt, cur.Next(context.Background()), "expected Next true, got false")
53 assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
54 })
55 mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
56 testCases := []struct {
57 name string
58 minVersion, maxVersion string
59 errCode int32
60 poolCleared bool
61 }{
62 {"notPrimary keep pool", "4.2", "", errorNotPrimary, false},
63 {"notPrimary reset pool", "4.0", "4.0", errorNotPrimary, true},
64 {"shutdown in progress reset pool", "4.0", "", errorShutdownInProgress, true},
65 {"interrupted at shutdown reset pool", "4.0", "", errorInterruptedAtShutdown, true},
66 }
67 for _, tc := range testCases {
68 opts := mtest.NewOptions()
69 if tc.minVersion != "" {
70 opts.MinServerVersion(tc.minVersion)
71 }
72 if tc.maxVersion != "" {
73 opts.MaxServerVersion(tc.maxVersion)
74 }
75 mt.RunOpts(tc.name, opts, func(mt *mtest.T) {
76 tpm := eventtest.NewTestPoolMonitor()
77 mt.ResetClient(options.Client().
78 SetRetryWrites(false).
79 SetPoolMonitor(tpm.PoolMonitor).
80
81
82 SetHeartbeatInterval(defaultHeartbeatInterval))
83
84 mt.SetFailPoint(mtest.FailPoint{
85 ConfigureFailPoint: "failCommand",
86 Mode: mtest.FailPointMode{
87 Times: 1,
88 },
89 Data: mtest.FailPointData{
90 FailCommands: []string{"insert"},
91 ErrorCode: tc.errCode,
92 },
93 })
94
95 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
96 assert.NotNil(mt, err, "expected InsertOne error, got nil")
97 cerr, ok := err.(mongo.CommandError)
98 assert.True(mt, ok, "expected error type %v, got %v", mongo.CommandError{}, err)
99 assert.Equal(mt, tc.errCode, cerr.Code, "expected error code %v, got %v", tc.errCode, cerr.Code)
100
101 if tc.poolCleared {
102 assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
103 return
104 }
105
106
107 _, err = mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
108 assert.Nil(mt, err, "InsertOne error: %v", err)
109 assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
110 })
111 }
112 })
113 }
114
View as plain text