1
2
3
4
5
6
7 package mtest
8
9 import (
10 "errors"
11 "fmt"
12
13 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
14 "go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
15 )
16
17
18 type SentMessage struct {
19 RequestID int32
20 RawMessage wiremessage.WireMessage
21 Command bsoncore.Document
22 OpCode wiremessage.OpCode
23
24
25
26
27
28 ReadPreference bsoncore.Document
29
30
31
32
33 DocumentSequence *bsoncore.DocumentSequence
34 }
35
36 type sentMsgParseFn func([]byte) (*SentMessage, error)
37
38 func getSentMessageParser(opcode wiremessage.OpCode) (sentMsgParseFn, bool) {
39 switch opcode {
40 case wiremessage.OpQuery:
41 return parseOpQuery, true
42 case wiremessage.OpMsg:
43 return parseSentOpMsg, true
44 case wiremessage.OpCompressed:
45 return parseSentOpCompressed, true
46 default:
47 return nil, false
48 }
49 }
50
51 func parseOpQuery(wm []byte) (*SentMessage, error) {
52 var ok bool
53
54 if _, wm, ok = wiremessage.ReadQueryFlags(wm); !ok {
55 return nil, errors.New("failed to read query flags")
56 }
57 if _, wm, ok = wiremessage.ReadQueryFullCollectionName(wm); !ok {
58 return nil, errors.New("failed to read full collection name")
59 }
60 if _, wm, ok = wiremessage.ReadQueryNumberToSkip(wm); !ok {
61 return nil, errors.New("failed to read number to skip")
62 }
63 if _, wm, ok = wiremessage.ReadQueryNumberToReturn(wm); !ok {
64 return nil, errors.New("failed to read number to return")
65 }
66
67 query, wm, ok := wiremessage.ReadQueryQuery(wm)
68 if !ok {
69 return nil, errors.New("failed to read query")
70 }
71
72
73
74 commandDoc := query
75 var rpDoc bsoncore.Document
76
77 dollarQueryVal, err := query.LookupErr("$query")
78 if err == nil {
79 commandDoc = dollarQueryVal.Document()
80
81 rpVal, err := query.LookupErr("$readPreference")
82 if err != nil {
83 return nil, fmt.Errorf("query %s contains $query but not $readPreference fields", query)
84 }
85 rpDoc = rpVal.Document()
86 }
87
88
89
90 var docSequence *bsoncore.DocumentSequence
91 cmdElems, _ := commandDoc.Elements()
92 for _, elem := range cmdElems {
93 switch elem.Key() {
94 case "documents", "updates", "deletes":
95 docSequence = &bsoncore.DocumentSequence{
96 Style: bsoncore.ArrayStyle,
97 Data: elem.Value().Array(),
98 }
99 }
100 if docSequence != nil {
101
102 break
103 }
104 }
105
106 sm := &SentMessage{
107 Command: commandDoc,
108 ReadPreference: rpDoc,
109 DocumentSequence: docSequence,
110 }
111 return sm, nil
112 }
113
114 func parseSentMessage(wm []byte) (*SentMessage, error) {
115
116 _, requestID, _, opcode, remaining, ok := wiremessage.ReadHeader(wm)
117 if !ok {
118 return nil, errors.New("failed to read wiremessage header")
119 }
120
121 parseFn, ok := getSentMessageParser(opcode)
122 if !ok {
123 return nil, fmt.Errorf("unknown opcode: %v", opcode)
124 }
125 sent, err := parseFn(remaining)
126 if err != nil {
127 return nil, fmt.Errorf("error parsing wiremessage with opcode %s: %w", opcode, err)
128 }
129
130 sent.RequestID = requestID
131 sent.RawMessage = wm
132 sent.OpCode = opcode
133 return sent, nil
134 }
135
136 func parseSentOpMsg(wm []byte) (*SentMessage, error) {
137 var ok bool
138 var err error
139
140 if _, wm, ok = wiremessage.ReadMsgFlags(wm); !ok {
141 return nil, errors.New("failed to read flags")
142 }
143
144 if wm, err = assertMsgSectionType(wm, wiremessage.SingleDocument); err != nil {
145 return nil, fmt.Errorf("error verifying section type for command document: %w", err)
146 }
147
148 var commandDoc bsoncore.Document
149 commandDoc, wm, ok = wiremessage.ReadMsgSectionSingleDocument(wm)
150 if !ok {
151 return nil, errors.New("failed to read command document")
152 }
153
154 var rpDoc bsoncore.Document
155 if rpVal, err := commandDoc.LookupErr("$readPreference"); err == nil {
156 rpDoc = rpVal.Document()
157 }
158
159 var docSequence *bsoncore.DocumentSequence
160 if len(wm) != 0 {
161
162 if wm, err = assertMsgSectionType(wm, wiremessage.DocumentSequence); err != nil {
163 return nil, fmt.Errorf("error verifying section type for document sequence: %w", err)
164 }
165
166 var data []byte
167 _, data, wm, ok = wiremessage.ReadMsgSectionRawDocumentSequence(wm)
168 if !ok {
169 return nil, errors.New("failed to read document sequence")
170 }
171
172 docSequence = &bsoncore.DocumentSequence{
173 Style: bsoncore.SequenceStyle,
174 Data: data,
175 }
176 }
177
178 sm := &SentMessage{
179 Command: commandDoc,
180 ReadPreference: rpDoc,
181 DocumentSequence: docSequence,
182 }
183 return sm, nil
184 }
185
186 func parseSentOpCompressed(wm []byte) (*SentMessage, error) {
187 originalOpcode, wm, err := parseOpCompressed(wm)
188 if err != nil {
189 return nil, err
190 }
191
192 parser, ok := getSentMessageParser(originalOpcode)
193 if !ok {
194 return nil, fmt.Errorf("unknown original opcode %v", originalOpcode)
195 }
196 return parser(wm)
197 }
198
View as plain text