1
2
3
4
5
6
7 package integration
8
9 import (
10 "bytes"
11 "context"
12 "fmt"
13 "sync"
14 "testing"
15 "time"
16
17 "go.mongodb.org/mongo-driver/bson"
18 "go.mongodb.org/mongo-driver/bson/bsontype"
19 "go.mongodb.org/mongo-driver/event"
20 "go.mongodb.org/mongo-driver/internal/assert"
21 "go.mongodb.org/mongo-driver/internal/eventtest"
22 "go.mongodb.org/mongo-driver/internal/require"
23 "go.mongodb.org/mongo-driver/mongo"
24 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
25 "go.mongodb.org/mongo-driver/mongo/options"
26 "go.mongodb.org/mongo-driver/x/mongo/driver"
27 )
28
29 func TestRetryableWritesProse(t *testing.T) {
30 clientOpts := options.Client().SetRetryWrites(true).SetWriteConcern(mtest.MajorityWc).
31 SetReadConcern(mtest.MajorityRc)
32 mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("3.6").CreateClient(false)
33 mt := mtest.New(t, mtOpts)
34
35 includeOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet, mtest.Sharded).CreateClient(false)
36 mt.RunOpts("txn number included", includeOpts, func(mt *mtest.T) {
37 updateDoc := bson.D{{"$inc", bson.D{{"x", 1}}}}
38 insertOneDoc := bson.D{{"x", 1}}
39 insertManyOrderedArgs := bson.D{
40 {"options", bson.D{{"ordered", true}}},
41 {"documents", []interface{}{insertOneDoc}},
42 }
43 insertManyUnorderedArgs := bson.D{
44 {"options", bson.D{{"ordered", true}}},
45 {"documents", []interface{}{insertOneDoc}},
46 }
47
48 testCases := []struct {
49 operationName string
50 args bson.D
51 expectTxnNumber bool
52 }{
53 {"deleteOne", bson.D{}, true},
54 {"deleteMany", bson.D{}, false},
55 {"updateOne", bson.D{{"update", updateDoc}}, true},
56 {"updateMany", bson.D{{"update", updateDoc}}, false},
57 {"replaceOne", bson.D{}, true},
58 {"insertOne", bson.D{{"document", insertOneDoc}}, true},
59 {"insertMany", insertManyOrderedArgs, true},
60 {"insertMany", insertManyUnorderedArgs, true},
61 {"findOneAndReplace", bson.D{}, true},
62 {"findOneAndUpdate", bson.D{{"update", updateDoc}}, true},
63 {"findOneAndDelete", bson.D{}, true},
64 }
65 for _, tc := range testCases {
66 mt.Run(tc.operationName, func(mt *mtest.T) {
67 tcArgs, err := bson.Marshal(tc.args)
68 assert.Nil(mt, err, "Marshal error: %v", err)
69 crudOp := crudOperation{
70 Name: tc.operationName,
71 Arguments: tcArgs,
72 }
73
74 mt.ClearEvents()
75 runCrudOperation(mt, "", crudOp, crudOutcome{})
76 started := mt.GetStartedEvent()
77 assert.NotNil(mt, started, "expected CommandStartedEvent, got nil")
78 _, err = started.Command.LookupErr("txnNumber")
79 if tc.expectTxnNumber {
80 assert.Nil(mt, err, "expected txnNumber in command %v", started.Command)
81 return
82 }
83 assert.NotNil(mt, err, "did not expect txnNumber in command %v", started.Command)
84 })
85 }
86 })
87 errorOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet, mtest.Sharded)
88 mt.RunOpts("wrap mmapv1 error", errorOpts, func(mt *mtest.T) {
89 res, err := mt.DB.RunCommand(context.Background(), bson.D{{"serverStatus", 1}}).Raw()
90 assert.Nil(mt, err, "serverStatus error: %v", err)
91 storageEngine, ok := res.Lookup("storageEngine", "name").StringValueOK()
92 if !ok || storageEngine != "mmapv1" {
93 mt.Skip("skipping because storage engine is not mmapv1")
94 }
95
96 _, err = mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
97 assert.Equal(mt, driver.ErrUnsupportedStorageEngine, err,
98 "expected error %v, got %v", driver.ErrUnsupportedStorageEngine, err)
99 })
100
101 standaloneOpts := mtest.NewOptions().Topologies(mtest.Single).CreateClient(false)
102 mt.RunOpts("transaction number not sent on writes", standaloneOpts, func(mt *mtest.T) {
103 mt.Run("explicit session", func(mt *mtest.T) {
104
105
106 sess, err := mt.Client.StartSession()
107 assert.Nil(mt, err, "StartSession error: %v", err)
108 defer sess.EndSession(context.Background())
109
110 mt.ClearEvents()
111
112 err = mongo.WithSession(context.Background(), sess, func(ctx mongo.SessionContext) error {
113 doc := bson.D{{"foo", 1}}
114 _, err := mt.Coll.InsertOne(ctx, doc)
115 return err
116 })
117 assert.Nil(mt, err, "InsertOne error: %v", err)
118
119 _, wantID := sess.ID().Lookup("id").Binary()
120 command := mt.GetStartedEvent().Command
121 lsid, err := command.LookupErr("lsid")
122 assert.Nil(mt, err, "Error getting lsid: %v", err)
123 _, gotID := lsid.Document().Lookup("id").Binary()
124 assert.True(mt, bytes.Equal(wantID, gotID), "expected session ID %v, got %v", wantID, gotID)
125 txnNumber, err := command.LookupErr("txnNumber")
126 assert.NotNil(mt, err, "expected no txnNumber, got %v", txnNumber)
127 })
128 mt.Run("implicit session", func(mt *mtest.T) {
129
130
131 mt.ClearEvents()
132
133 doc := bson.D{{"foo", 1}}
134 _, err := mt.Coll.InsertOne(context.Background(), doc)
135 assert.Nil(mt, err, "InsertOne error: %v", err)
136
137 command := mt.GetStartedEvent().Command
138 lsid, err := command.LookupErr("lsid")
139 assert.Nil(mt, err, "Error getting lsid: %v", err)
140 _, gotID := lsid.Document().Lookup("id").Binary()
141 assert.NotNil(mt, gotID, "expected session ID, got nil")
142 txnNumber, err := command.LookupErr("txnNumber")
143 assert.NotNil(mt, err, "expected no txnNumber, got %v", txnNumber)
144 })
145 })
146
147 tpm := eventtest.NewTestPoolMonitor()
148
149
150
151 hosts := mtest.ClusterConnString().Hosts
152 pceOpts := options.Client().SetMaxPoolSize(1).SetRetryWrites(true).
153 SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond).
154 SetHosts(hosts[:1])
155
156 mtPceOpts := mtest.NewOptions().ClientOptions(pceOpts).MinServerVersion("4.3").
157 Topologies(mtest.ReplicaSet, mtest.Sharded)
158 mt.RunOpts("PoolClearedError retryability", mtPceOpts, func(mt *mtest.T) {
159
160 mt.SetFailPoint(mtest.FailPoint{
161 ConfigureFailPoint: "failCommand",
162 Mode: mtest.FailPointMode{
163 Times: 1,
164 },
165 Data: mtest.FailPointData{
166 FailCommands: []string{"insert"},
167 ErrorCode: 91,
168 BlockConnection: true,
169 BlockTimeMS: 1000,
170 ErrorLabels: &[]string{"RetryableWriteError"},
171 },
172 })
173
174
175 tpm.ClearEvents()
176 mt.ClearEvents()
177
178
179
180 var wg sync.WaitGroup
181 for i := 0; i < 2; i++ {
182 wg.Add(1)
183 go func() {
184 defer wg.Done()
185 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
186 assert.Nil(mt, err, "InsertOne error: %v", err)
187 }()
188 }
189 wg.Wait()
190
191
192 events := tpm.Events(func(e *event.PoolEvent) bool {
193 getSucceeded := e.Type == event.GetSucceeded
194 getFailed := e.Type == event.GetFailed
195 poolCleared := e.Type == event.PoolCleared
196 return getSucceeded || getFailed || poolCleared
197 })
198
199
200
201 assert.True(mt, len(events) >= 3, "expected at least 3 events, got %v", len(events))
202 assert.Equal(mt, event.GetSucceeded, events[0].Type,
203 "expected ConnectionCheckedOut event, got %v", events[0].Type)
204 assert.Equal(mt, event.PoolCleared, events[1].Type,
205 "expected ConnectionPoolCleared event, got %v", events[1].Type)
206 assert.Equal(mt, event.GetFailed, events[2].Type,
207 "expected ConnectionCheckedOutFailed event, got %v", events[2].Type)
208 assert.Equal(mt, event.ReasonConnectionErrored, events[2].Reason,
209 "expected check out failure due to connection error, failed due to %q", events[2].Reason)
210
211
212 for i := 0; i < 3; i++ {
213 cmdEvt := mt.GetStartedEvent()
214 assert.NotNil(mt, cmdEvt, "expected an insert event, got nil")
215 assert.Equal(mt, cmdEvt.CommandName, "insert",
216 "expected an insert event, got a(n) %v event", cmdEvt.CommandName)
217 }
218 })
219
220 mtNWPOpts := mtest.NewOptions().MinServerVersion("6.0").Topologies(mtest.ReplicaSet)
221 mt.RunOpts(fmt.Sprintf("%s label returns original error", driver.NoWritesPerformed), mtNWPOpts,
222 func(mt *mtest.T) {
223 const shutdownInProgressErrorCode int32 = 91
224 const notWritablePrimaryErrorCode int32 = 10107
225
226 monitor := new(event.CommandMonitor)
227 mt.ResetClient(options.Client().SetRetryWrites(true).SetMonitor(monitor))
228
229
230 mt.SetFailPoint(mtest.FailPoint{
231 ConfigureFailPoint: "failCommand",
232 Mode: mtest.FailPointMode{Times: 1},
233 Data: mtest.FailPointData{
234 WriteConcernError: &mtest.WriteConcernErrorData{
235 Code: shutdownInProgressErrorCode,
236 },
237 FailCommands: []string{"insert"},
238 },
239 })
240
241
242
243 var secondFailPointConfigured bool
244
245
246 monitor.Succeeded = func(_ context.Context, evt *event.CommandSucceededEvent) {
247 var errorCode int32
248 if wce := evt.Reply.Lookup("writeConcernError"); wce.Type == bsontype.EmbeddedDocument {
249 var ok bool
250 errorCode, ok = wce.Document().Lookup("code").Int32OK()
251 if !ok {
252 t.Fatalf("expected code to be an int32, got %v",
253 wce.Document().Lookup("code").Type)
254 return
255 }
256 }
257
258
259
260 if errorCode != shutdownInProgressErrorCode {
261 return
262 }
263
264 mt.SetFailPoint(mtest.FailPoint{
265 ConfigureFailPoint: "failCommand",
266 Mode: mtest.FailPointMode{Times: 1},
267 Data: mtest.FailPointData{
268 ErrorCode: notWritablePrimaryErrorCode,
269 ErrorLabels: &[]string{
270 driver.NoWritesPerformed,
271 driver.RetryableWriteError,
272 },
273 FailCommands: []string{"insert"},
274 },
275 })
276 secondFailPointConfigured = true
277 }
278
279
280 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
281
282 require.True(mt, secondFailPointConfigured)
283
284
285 require.True(mt, err.(mongo.WriteException).HasErrorCode(int(shutdownInProgressErrorCode)))
286 })
287
288 mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2")
289 mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) {
290 tests := []struct {
291 name string
292
293
294
295
296
297 hostCount int
298 failpointErrorCode int32
299 expectedFailCount int
300 expectedSuccessCount int
301 }{
302 {
303 name: "retry on different mongos",
304 hostCount: 2,
305 failpointErrorCode: 6,
306 expectedFailCount: 2,
307 expectedSuccessCount: 0,
308 },
309 {
310 name: "retry on same mongos",
311 hostCount: 1,
312 failpointErrorCode: 6,
313 expectedFailCount: 1,
314 expectedSuccessCount: 1,
315 },
316 }
317
318 for _, tc := range tests {
319 mt.Run(tc.name, func(mt *mtest.T) {
320 hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
321 require.GreaterOrEqualf(mt, len(hosts), tc.hostCount,
322 "test cluster must have at least %v mongos hosts", tc.hostCount)
323
324
325 failPoint := mtest.FailPoint{
326 ConfigureFailPoint: "failCommand",
327 Mode: mtest.FailPointMode{
328 Times: 1,
329 },
330 Data: mtest.FailPointData{
331 FailCommands: []string{"insert"},
332 ErrorLabels: &[]string{"RetryableWriteError"},
333 ErrorCode: tc.failpointErrorCode,
334 CloseConnection: false,
335 },
336 }
337
338
339
340
341
342
343 for i := 0; i < tc.hostCount; i++ {
344 mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
345 mt.SetFailPoint(failPoint)
346
347
348
349
350 defer mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
351 defer mt.ClearFailPoints()
352 }
353
354 failCount := 0
355 successCount := 0
356
357 commandMonitor := &event.CommandMonitor{
358 Failed: func(context.Context, *event.CommandFailedEvent) {
359 failCount++
360 },
361 Succeeded: func(context.Context, *event.CommandSucceededEvent) {
362 successCount++
363 },
364 }
365
366
367 mt.ResetClient(options.Client().
368 SetHosts(hosts[:tc.hostCount]).
369 SetRetryWrites(true).
370 SetMonitor(commandMonitor))
371
372 _, _ = mt.Coll.InsertOne(context.Background(), bson.D{})
373
374 assert.Equal(mt, tc.expectedFailCount, failCount)
375 assert.Equal(mt, tc.expectedSuccessCount, successCount)
376 })
377 }
378 })
379 }
380
View as plain text