1
2
3
4
5
6
7 package driver
8
9 import (
10 "bytes"
11 "context"
12 "errors"
13 "fmt"
14 "strings"
15
16 "go.mongodb.org/mongo-driver/bson"
17 "go.mongodb.org/mongo-driver/internal/csot"
18 "go.mongodb.org/mongo-driver/mongo/description"
19 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
20 )
21
22
23
24
25 const LegacyNotPrimaryErrMsg = "not master"
26
27 var (
28 retryableCodes = []int32{11600, 11602, 10107, 13435, 13436, 189, 91, 7, 6, 89, 9001, 262}
29 nodeIsRecoveringCodes = []int32{11600, 11602, 13436, 189, 91}
30 notPrimaryCodes = []int32{10107, 13435, 10058}
31 nodeIsShuttingDownCodes = []int32{11600, 91}
32
33 unknownReplWriteConcernCode = int32(79)
34 unsatisfiableWriteConcernCode = int32(100)
35 )
36
37 var (
38
39 UnknownTransactionCommitResult = "UnknownTransactionCommitResult"
40
41 TransientTransactionError = "TransientTransactionError"
42
43 NetworkError = "NetworkError"
44
45 RetryableWriteError = "RetryableWriteError"
46
47 NoWritesPerformed = "NoWritesPerformed"
48
49 ErrCursorNotFound = errors.New("cursor not found")
50
51
52 ErrUnacknowledgedWrite = errors.New("unacknowledged write")
53
54
55 ErrUnsupportedStorageEngine = errors.New("this MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string")
56
57
58
59 ErrDeadlineWouldBeExceeded = fmt.Errorf(
60 "operation not sent to server, as Timeout would be exceeded: %w",
61 context.DeadlineExceeded)
62
63 ErrNegativeMaxTime = errors.New("a negative value was provided for MaxTime on an operation")
64 )
65
66
67 type QueryFailureError struct {
68 Message string
69 Response bsoncore.Document
70 Wrapped error
71 }
72
73
74 func (e QueryFailureError) Error() string {
75 return fmt.Sprintf("%s: %v", e.Message, e.Response)
76 }
77
78
79 func (e QueryFailureError) Unwrap() error {
80 return e.Wrapped
81 }
82
83
84 type ResponseError struct {
85 Message string
86 Wrapped error
87 }
88
89
90 func NewCommandResponseError(msg string, err error) ResponseError {
91 return ResponseError{Message: msg, Wrapped: err}
92 }
93
94
95 func (e ResponseError) Error() string {
96 if e.Wrapped != nil {
97 return fmt.Sprintf("%s: %s", e.Message, e.Wrapped)
98 }
99 return e.Message
100 }
101
102
103 type WriteCommandError struct {
104 WriteConcernError *WriteConcernError
105 WriteErrors WriteErrors
106 Labels []string
107 Raw bsoncore.Document
108 }
109
110
111
112 func (wce WriteCommandError) UnsupportedStorageEngine() bool {
113 for _, writeError := range wce.WriteErrors {
114 if writeError.Code == 20 && strings.HasPrefix(strings.ToLower(writeError.Message), "transaction numbers") {
115 return true
116 }
117 }
118 return false
119 }
120
121 func (wce WriteCommandError) Error() string {
122 var buf bytes.Buffer
123 fmt.Fprint(&buf, "write command error: [")
124 fmt.Fprintf(&buf, "{%s}, ", wce.WriteErrors)
125 fmt.Fprintf(&buf, "{%s}]", wce.WriteConcernError)
126 return buf.String()
127 }
128
129
130 func (wce WriteCommandError) Retryable(wireVersion *description.VersionRange) bool {
131 for _, label := range wce.Labels {
132 if label == RetryableWriteError {
133 return true
134 }
135 }
136 if wireVersion != nil && wireVersion.Max >= 9 {
137 return false
138 }
139
140 if wce.WriteConcernError == nil {
141 return false
142 }
143 return (*wce.WriteConcernError).Retryable()
144 }
145
146
147 func (wce WriteCommandError) HasErrorLabel(label string) bool {
148 if wce.Labels != nil {
149 for _, l := range wce.Labels {
150 if l == label {
151 return true
152 }
153 }
154 }
155 return false
156 }
157
158
159
160 type WriteConcernError struct {
161 Name string
162 Code int64
163 Message string
164 Details bsoncore.Document
165 Labels []string
166 TopologyVersion *description.TopologyVersion
167 Raw bsoncore.Document
168 }
169
170 func (wce WriteConcernError) Error() string {
171 if wce.Name != "" {
172 return fmt.Sprintf("(%v) %v", wce.Name, wce.Message)
173 }
174 return wce.Message
175 }
176
177
178 func (wce WriteConcernError) Retryable() bool {
179 for _, code := range retryableCodes {
180 if wce.Code == int64(code) {
181 return true
182 }
183 }
184
185 return false
186 }
187
188
189 func (wce WriteConcernError) NodeIsRecovering() bool {
190 for _, code := range nodeIsRecoveringCodes {
191 if wce.Code == int64(code) {
192 return true
193 }
194 }
195 hasNoCode := wce.Code == 0
196 return hasNoCode && strings.Contains(wce.Message, "node is recovering")
197 }
198
199
200 func (wce WriteConcernError) NodeIsShuttingDown() bool {
201 for _, code := range nodeIsShuttingDownCodes {
202 if wce.Code == int64(code) {
203 return true
204 }
205 }
206 hasNoCode := wce.Code == 0
207 return hasNoCode && strings.Contains(wce.Message, "node is shutting down")
208 }
209
210
211 func (wce WriteConcernError) NotPrimary() bool {
212 for _, code := range notPrimaryCodes {
213 if wce.Code == int64(code) {
214 return true
215 }
216 }
217 hasNoCode := wce.Code == 0
218 return hasNoCode && strings.Contains(wce.Message, LegacyNotPrimaryErrMsg)
219 }
220
221
222
223 type WriteError struct {
224 Index int64
225 Code int64
226 Message string
227 Details bsoncore.Document
228 Raw bsoncore.Document
229 }
230
231 func (we WriteError) Error() string { return we.Message }
232
233
234
235 type WriteErrors []WriteError
236
237 func (we WriteErrors) Error() string {
238 var buf bytes.Buffer
239 fmt.Fprint(&buf, "write errors: [")
240 for idx, err := range we {
241 if idx != 0 {
242 fmt.Fprintf(&buf, ", ")
243 }
244 fmt.Fprintf(&buf, "{%s}", err)
245 }
246 fmt.Fprint(&buf, "]")
247 return buf.String()
248 }
249
250
251 type Error struct {
252 Code int32
253 Message string
254 Labels []string
255 Name string
256 Wrapped error
257 TopologyVersion *description.TopologyVersion
258 Raw bsoncore.Document
259 }
260
261
262 func (e Error) UnsupportedStorageEngine() bool {
263 return e.Code == 20 && strings.HasPrefix(strings.ToLower(e.Message), "transaction numbers")
264 }
265
266
267 func (e Error) Error() string {
268 var msg string
269 if e.Name != "" {
270 msg = fmt.Sprintf("(%v)", e.Name)
271 }
272 msg += " " + e.Message
273 if e.Wrapped != nil {
274 msg += ": " + e.Wrapped.Error()
275 }
276 return msg
277 }
278
279
280 func (e Error) Unwrap() error {
281 return e.Wrapped
282 }
283
284
285 func (e Error) HasErrorLabel(label string) bool {
286 if e.Labels != nil {
287 for _, l := range e.Labels {
288 if l == label {
289 return true
290 }
291 }
292 }
293 return false
294 }
295
296
297 func (e Error) RetryableRead() bool {
298 for _, label := range e.Labels {
299 if label == NetworkError {
300 return true
301 }
302 }
303 for _, code := range retryableCodes {
304 if e.Code == code {
305 return true
306 }
307 }
308
309 return false
310 }
311
312
313 func (e Error) RetryableWrite(wireVersion *description.VersionRange) bool {
314 for _, label := range e.Labels {
315 if label == NetworkError || label == RetryableWriteError {
316 return true
317 }
318 }
319 if wireVersion != nil && wireVersion.Max >= 9 {
320 return false
321 }
322 for _, code := range retryableCodes {
323 if e.Code == code {
324 return true
325 }
326 }
327
328 return false
329 }
330
331
332 func (e Error) NetworkError() bool {
333 for _, label := range e.Labels {
334 if label == NetworkError {
335 return true
336 }
337 }
338 return false
339 }
340
341
342 func (e Error) NodeIsRecovering() bool {
343 for _, code := range nodeIsRecoveringCodes {
344 if e.Code == code {
345 return true
346 }
347 }
348 hasNoCode := e.Code == 0
349 return hasNoCode && strings.Contains(e.Message, "node is recovering")
350 }
351
352
353 func (e Error) NodeIsShuttingDown() bool {
354 for _, code := range nodeIsShuttingDownCodes {
355 if e.Code == code {
356 return true
357 }
358 }
359 hasNoCode := e.Code == 0
360 return hasNoCode && strings.Contains(e.Message, "node is shutting down")
361 }
362
363
364 func (e Error) NotPrimary() bool {
365 for _, code := range notPrimaryCodes {
366 if e.Code == code {
367 return true
368 }
369 }
370 hasNoCode := e.Code == 0
371 return hasNoCode && strings.Contains(e.Message, LegacyNotPrimaryErrMsg)
372 }
373
374
375 func (e Error) NamespaceNotFound() bool {
376 return e.Code == 26 || e.Message == "ns not found"
377 }
378
379
380
381 func ExtractErrorFromServerResponse(ctx context.Context, doc bsoncore.Document) error {
382 var errmsg, codeName string
383 var code int32
384 var labels []string
385 var ok bool
386 var tv *description.TopologyVersion
387 var wcError WriteCommandError
388 elems, err := doc.Elements()
389 if err != nil {
390 return err
391 }
392
393 for _, elem := range elems {
394 switch elem.Key() {
395 case "ok":
396 switch elem.Value().Type {
397 case bson.TypeInt32:
398 if elem.Value().Int32() == 1 {
399 ok = true
400 }
401 case bson.TypeInt64:
402 if elem.Value().Int64() == 1 {
403 ok = true
404 }
405 case bson.TypeDouble:
406 if elem.Value().Double() == 1 {
407 ok = true
408 }
409 case bson.TypeBoolean:
410 if elem.Value().Boolean() {
411 ok = true
412 }
413 }
414 case "errmsg":
415 if str, okay := elem.Value().StringValueOK(); okay {
416 errmsg = str
417 }
418 case "codeName":
419 if str, okay := elem.Value().StringValueOK(); okay {
420 codeName = str
421 }
422 case "code":
423 if c, okay := elem.Value().Int32OK(); okay {
424 code = c
425 }
426 case "errorLabels":
427 if arr, okay := elem.Value().ArrayOK(); okay {
428 vals, err := arr.Values()
429 if err != nil {
430 continue
431 }
432 for _, val := range vals {
433 if str, ok := val.StringValueOK(); ok {
434 labels = append(labels, str)
435 }
436 }
437
438 }
439 case "writeErrors":
440 arr, exists := elem.Value().ArrayOK()
441 if !exists {
442 break
443 }
444 vals, err := arr.Values()
445 if err != nil {
446 continue
447 }
448 for _, val := range vals {
449 var we WriteError
450 doc, exists := val.DocumentOK()
451 if !exists {
452 continue
453 }
454 if index, exists := doc.Lookup("index").AsInt64OK(); exists {
455 we.Index = index
456 }
457 if code, exists := doc.Lookup("code").AsInt64OK(); exists {
458 we.Code = code
459 }
460 if msg, exists := doc.Lookup("errmsg").StringValueOK(); exists {
461 we.Message = msg
462 }
463 if info, exists := doc.Lookup("errInfo").DocumentOK(); exists {
464 we.Details = make([]byte, len(info))
465 copy(we.Details, info)
466 }
467 we.Raw = doc
468 wcError.WriteErrors = append(wcError.WriteErrors, we)
469 }
470 case "writeConcernError":
471 doc, exists := elem.Value().DocumentOK()
472 if !exists {
473 break
474 }
475 wcError.WriteConcernError = new(WriteConcernError)
476 wcError.WriteConcernError.Raw = doc
477 if code, exists := doc.Lookup("code").AsInt64OK(); exists {
478 wcError.WriteConcernError.Code = code
479 }
480 if name, exists := doc.Lookup("codeName").StringValueOK(); exists {
481 wcError.WriteConcernError.Name = name
482 }
483 if msg, exists := doc.Lookup("errmsg").StringValueOK(); exists {
484 wcError.WriteConcernError.Message = msg
485 }
486 if info, exists := doc.Lookup("errInfo").DocumentOK(); exists {
487 wcError.WriteConcernError.Details = make([]byte, len(info))
488 copy(wcError.WriteConcernError.Details, info)
489 }
490 if errLabels, exists := doc.Lookup("errorLabels").ArrayOK(); exists {
491 vals, err := errLabels.Values()
492 if err != nil {
493 continue
494 }
495 for _, val := range vals {
496 if str, ok := val.StringValueOK(); ok {
497 labels = append(labels, str)
498 }
499 }
500 }
501 case "topologyVersion":
502 doc, ok := elem.Value().DocumentOK()
503 if !ok {
504 break
505 }
506 version, err := description.NewTopologyVersion(bson.Raw(doc))
507 if err == nil {
508 tv = version
509 }
510 }
511 }
512
513 if !ok {
514 if errmsg == "" {
515 errmsg = "command failed"
516 }
517
518 err := Error{
519 Code: code,
520 Message: errmsg,
521 Name: codeName,
522 Labels: labels,
523 TopologyVersion: tv,
524 Raw: doc,
525 }
526
527
528
529
530
531
532
533
534
535 if csot.IsTimeoutContext(ctx) && err.Code == 50 {
536 err.Wrapped = context.DeadlineExceeded
537 }
538
539 return err
540 }
541
542 if len(wcError.WriteErrors) > 0 || wcError.WriteConcernError != nil {
543 wcError.Labels = labels
544 if wcError.WriteConcernError != nil {
545 wcError.WriteConcernError.TopologyVersion = tv
546 }
547 wcError.Raw = doc
548 return wcError
549 }
550
551 return nil
552 }
553
View as plain text