1
2
3
4
5
6
7 package wiremessage
8
9 import (
10 "bytes"
11 "strings"
12 "sync/atomic"
13
14 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
15 )
16
17
18 type WireMessage []byte
19
20 var globalRequestID int32
21
22
23 func NextRequestID() int32 { return atomic.AddInt32(&globalRequestID, 1) }
24
25
26 type OpCode int32
27
28
29
30
31 const (
32 OpReply OpCode = 1
33 _ OpCode = 1001
34 OpUpdate OpCode = 2001
35 OpInsert OpCode = 2002
36 _ OpCode = 2003
37
38 OpQuery OpCode = 2004
39 OpGetMore OpCode = 2005
40 OpDelete OpCode = 2006
41 OpKillCursors OpCode = 2007
42 OpCommand OpCode = 2010
43 OpCommandReply OpCode = 2011
44 OpCompressed OpCode = 2012
45 OpMsg OpCode = 2013
46 )
47
48
49 func (oc OpCode) String() string {
50 switch oc {
51 case OpReply:
52 return "OP_REPLY"
53 case OpUpdate:
54 return "OP_UPDATE"
55 case OpInsert:
56 return "OP_INSERT"
57 case OpQuery:
58 return "OP_QUERY"
59 case OpGetMore:
60 return "OP_GET_MORE"
61 case OpDelete:
62 return "OP_DELETE"
63 case OpKillCursors:
64 return "OP_KILL_CURSORS"
65 case OpCommand:
66 return "OP_COMMAND"
67 case OpCommandReply:
68 return "OP_COMMANDREPLY"
69 case OpCompressed:
70 return "OP_COMPRESSED"
71 case OpMsg:
72 return "OP_MSG"
73 default:
74 return "<invalid opcode>"
75 }
76 }
77
78
79 type QueryFlag int32
80
81
82 const (
83 _ QueryFlag = 1 << iota
84 TailableCursor
85 SecondaryOK
86 OplogReplay
87 NoCursorTimeout
88 AwaitData
89 Exhaust
90 Partial
91 )
92
93
94 func (qf QueryFlag) String() string {
95 strs := make([]string, 0)
96 if qf&TailableCursor == TailableCursor {
97 strs = append(strs, "TailableCursor")
98 }
99 if qf&SecondaryOK == SecondaryOK {
100 strs = append(strs, "SecondaryOK")
101 }
102 if qf&OplogReplay == OplogReplay {
103 strs = append(strs, "OplogReplay")
104 }
105 if qf&NoCursorTimeout == NoCursorTimeout {
106 strs = append(strs, "NoCursorTimeout")
107 }
108 if qf&AwaitData == AwaitData {
109 strs = append(strs, "AwaitData")
110 }
111 if qf&Exhaust == Exhaust {
112 strs = append(strs, "Exhaust")
113 }
114 if qf&Partial == Partial {
115 strs = append(strs, "Partial")
116 }
117 str := "["
118 str += strings.Join(strs, ", ")
119 str += "]"
120 return str
121 }
122
123
124 type MsgFlag uint32
125
126
127 const (
128 ChecksumPresent MsgFlag = 1 << iota
129 MoreToCome
130
131 ExhaustAllowed MsgFlag = 1 << 16
132 )
133
134
135 type ReplyFlag int32
136
137
138 const (
139 CursorNotFound ReplyFlag = 1 << iota
140 QueryFailure
141 ShardConfigStale
142 AwaitCapable
143 )
144
145
146 func (rf ReplyFlag) String() string {
147 strs := make([]string, 0)
148 if rf&CursorNotFound == CursorNotFound {
149 strs = append(strs, "CursorNotFound")
150 }
151 if rf&QueryFailure == QueryFailure {
152 strs = append(strs, "QueryFailure")
153 }
154 if rf&ShardConfigStale == ShardConfigStale {
155 strs = append(strs, "ShardConfigStale")
156 }
157 if rf&AwaitCapable == AwaitCapable {
158 strs = append(strs, "AwaitCapable")
159 }
160 str := "["
161 str += strings.Join(strs, ", ")
162 str += "]"
163 return str
164 }
165
166
167 type SectionType uint8
168
169
170 const (
171 SingleDocument SectionType = iota
172 DocumentSequence
173 )
174
175
176 type CompressorID uint8
177
178
179 const (
180 CompressorNoOp CompressorID = iota
181 CompressorSnappy
182 CompressorZLib
183 CompressorZstd
184 )
185
186
187 func (id CompressorID) String() string {
188 switch id {
189 case CompressorNoOp:
190 return "CompressorNoOp"
191 case CompressorSnappy:
192 return "CompressorSnappy"
193 case CompressorZLib:
194 return "CompressorZLib"
195 case CompressorZstd:
196 return "CompressorZstd"
197 default:
198 return "CompressorInvalid"
199 }
200 }
201
202 const (
203
204 DefaultZlibLevel = 6
205
206
207 DefaultZstdLevel = 6
208 )
209
210
211
212 func AppendHeaderStart(dst []byte, reqid, respto int32, opcode OpCode) (index int32, b []byte) {
213 index, dst = bsoncore.ReserveLength(dst)
214 dst = appendi32(dst, reqid)
215 dst = appendi32(dst, respto)
216 dst = appendi32(dst, int32(opcode))
217 return index, dst
218 }
219
220
221 func AppendHeader(dst []byte, length, reqid, respto int32, opcode OpCode) []byte {
222 dst = appendi32(dst, length)
223 dst = appendi32(dst, reqid)
224 dst = appendi32(dst, respto)
225 dst = appendi32(dst, int32(opcode))
226 return dst
227 }
228
229
230 func ReadHeader(src []byte) (length, requestID, responseTo int32, opcode OpCode, rem []byte, ok bool) {
231 if len(src) < 16 {
232 return 0, 0, 0, 0, src, false
233 }
234 length = (int32(src[0]) | int32(src[1])<<8 | int32(src[2])<<16 | int32(src[3])<<24)
235 requestID = (int32(src[4]) | int32(src[5])<<8 | int32(src[6])<<16 | int32(src[7])<<24)
236 responseTo = (int32(src[8]) | int32(src[9])<<8 | int32(src[10])<<16 | int32(src[11])<<24)
237 opcode = OpCode(int32(src[12]) | int32(src[13])<<8 | int32(src[14])<<16 | int32(src[15])<<24)
238 return length, requestID, responseTo, opcode, src[16:], true
239 }
240
241
242 func AppendQueryFlags(dst []byte, flags QueryFlag) []byte {
243 return appendi32(dst, int32(flags))
244 }
245
246
247 func AppendMsgFlags(dst []byte, flags MsgFlag) []byte {
248 return appendi32(dst, int32(flags))
249 }
250
251
252 func AppendReplyFlags(dst []byte, flags ReplyFlag) []byte {
253 return appendi32(dst, int32(flags))
254 }
255
256
257 func AppendMsgSectionType(dst []byte, stype SectionType) []byte {
258 return append(dst, byte(stype))
259 }
260
261
262 func AppendQueryFullCollectionName(dst []byte, ns string) []byte {
263 return appendCString(dst, ns)
264 }
265
266
267 func AppendQueryNumberToSkip(dst []byte, skip int32) []byte {
268 return appendi32(dst, skip)
269 }
270
271
272 func AppendQueryNumberToReturn(dst []byte, nor int32) []byte {
273 return appendi32(dst, nor)
274 }
275
276
277 func AppendReplyCursorID(dst []byte, id int64) []byte {
278 return appendi64(dst, id)
279 }
280
281
282 func AppendReplyStartingFrom(dst []byte, sf int32) []byte {
283 return appendi32(dst, sf)
284 }
285
286
287 func AppendReplyNumberReturned(dst []byte, nr int32) []byte {
288 return appendi32(dst, nr)
289 }
290
291
292 func AppendCompressedOriginalOpCode(dst []byte, opcode OpCode) []byte {
293 return appendi32(dst, int32(opcode))
294 }
295
296
297
298 func AppendCompressedUncompressedSize(dst []byte, size int32) []byte { return appendi32(dst, size) }
299
300
301 func AppendCompressedCompressorID(dst []byte, id CompressorID) []byte {
302 return append(dst, byte(id))
303 }
304
305
306 func AppendCompressedCompressedMessage(dst []byte, msg []byte) []byte { return append(dst, msg...) }
307
308
309 func AppendGetMoreZero(dst []byte) []byte {
310 return appendi32(dst, 0)
311 }
312
313
314 func AppendGetMoreFullCollectionName(dst []byte, ns string) []byte {
315 return appendCString(dst, ns)
316 }
317
318
319 func AppendGetMoreNumberToReturn(dst []byte, numToReturn int32) []byte {
320 return appendi32(dst, numToReturn)
321 }
322
323
324 func AppendGetMoreCursorID(dst []byte, cursorID int64) []byte {
325 return appendi64(dst, cursorID)
326 }
327
328
329 func AppendKillCursorsZero(dst []byte) []byte {
330 return appendi32(dst, 0)
331 }
332
333
334 func AppendKillCursorsNumberIDs(dst []byte, numIDs int32) []byte {
335 return appendi32(dst, numIDs)
336 }
337
338
339 func AppendKillCursorsCursorIDs(dst []byte, cursors []int64) []byte {
340 for _, cursor := range cursors {
341 dst = appendi64(dst, cursor)
342 }
343 return dst
344 }
345
346
347 func ReadMsgFlags(src []byte) (flags MsgFlag, rem []byte, ok bool) {
348 i32, rem, ok := readi32(src)
349 return MsgFlag(i32), rem, ok
350 }
351
352
353 func IsMsgMoreToCome(wm []byte) bool {
354 return len(wm) >= 20 &&
355 OpCode(readi32unsafe(wm[12:16])) == OpMsg &&
356 MsgFlag(readi32unsafe(wm[16:20]))&MoreToCome == MoreToCome
357 }
358
359
360 func ReadMsgSectionType(src []byte) (stype SectionType, rem []byte, ok bool) {
361 if len(src) < 1 {
362 return 0, src, false
363 }
364 return SectionType(src[0]), src[1:], true
365 }
366
367
368 func ReadMsgSectionSingleDocument(src []byte) (doc bsoncore.Document, rem []byte, ok bool) {
369 return bsoncore.ReadDocument(src)
370 }
371
372
373
374 func ReadMsgSectionDocumentSequence(src []byte) (identifier string, docs []bsoncore.Document, rem []byte, ok bool) {
375 length, rem, ok := readi32(src)
376 if !ok || int(length) > len(src) {
377 return "", nil, rem, false
378 }
379
380 rem, ret := rem[:length-4], rem[length-4:]
381
382 identifier, rem, ok = readcstring(rem)
383 if !ok {
384 return "", nil, rem, false
385 }
386
387 docs = make([]bsoncore.Document, 0)
388 var doc bsoncore.Document
389 for {
390 doc, rem, ok = bsoncore.ReadDocument(rem)
391 if !ok {
392 break
393 }
394 docs = append(docs, doc)
395 }
396 if len(rem) > 0 {
397 return "", nil, append(rem, ret...), false
398 }
399
400 return identifier, docs, ret, true
401 }
402
403
404
405 func ReadMsgSectionRawDocumentSequence(src []byte) (identifier string, data []byte, rem []byte, ok bool) {
406 length, rem, ok := readi32(src)
407 if !ok || int(length) > len(src) {
408 return "", nil, rem, false
409 }
410
411
412
413 rem, rest := rem[:length-4], rem[length-4:]
414
415 identifier, rem, ok = readcstring(rem)
416 if !ok {
417 return "", nil, rem, false
418 }
419
420 return identifier, rem, rest, true
421 }
422
423
424 func ReadMsgChecksum(src []byte) (checksum uint32, rem []byte, ok bool) {
425 i32, rem, ok := readi32(src)
426 return uint32(i32), rem, ok
427 }
428
429
430
431
432
433 func ReadQueryFlags(src []byte) (flags QueryFlag, rem []byte, ok bool) {
434 i32, rem, ok := readi32(src)
435 return QueryFlag(i32), rem, ok
436 }
437
438
439
440
441
442 func ReadQueryFullCollectionName(src []byte) (collname string, rem []byte, ok bool) {
443 return readcstring(src)
444 }
445
446
447
448
449
450 func ReadQueryNumberToSkip(src []byte) (nts int32, rem []byte, ok bool) {
451 return readi32(src)
452 }
453
454
455
456
457
458 func ReadQueryNumberToReturn(src []byte) (ntr int32, rem []byte, ok bool) {
459 return readi32(src)
460 }
461
462
463
464
465
466 func ReadQueryQuery(src []byte) (query bsoncore.Document, rem []byte, ok bool) {
467 return bsoncore.ReadDocument(src)
468 }
469
470
471
472
473
474 func ReadQueryReturnFieldsSelector(src []byte) (rfs bsoncore.Document, rem []byte, ok bool) {
475 return bsoncore.ReadDocument(src)
476 }
477
478
479 func ReadReplyFlags(src []byte) (flags ReplyFlag, rem []byte, ok bool) {
480 i32, rem, ok := readi32(src)
481 return ReplyFlag(i32), rem, ok
482 }
483
484
485 func ReadReplyCursorID(src []byte) (cursorID int64, rem []byte, ok bool) {
486 return readi64(src)
487 }
488
489
490 func ReadReplyStartingFrom(src []byte) (startingFrom int32, rem []byte, ok bool) {
491 return readi32(src)
492 }
493
494
495 func ReadReplyNumberReturned(src []byte) (numberReturned int32, rem []byte, ok bool) {
496 return readi32(src)
497 }
498
499
500 func ReadReplyDocuments(src []byte) (docs []bsoncore.Document, rem []byte, ok bool) {
501 rem = src
502 for {
503 var doc bsoncore.Document
504 doc, rem, ok = bsoncore.ReadDocument(rem)
505 if !ok {
506 break
507 }
508
509 docs = append(docs, doc)
510 }
511
512 return docs, rem, true
513 }
514
515
516 func ReadReplyDocument(src []byte) (doc bsoncore.Document, rem []byte, ok bool) {
517 return bsoncore.ReadDocument(src)
518 }
519
520
521 func ReadCompressedOriginalOpCode(src []byte) (opcode OpCode, rem []byte, ok bool) {
522 i32, rem, ok := readi32(src)
523 return OpCode(i32), rem, ok
524 }
525
526
527
528 func ReadCompressedUncompressedSize(src []byte) (size int32, rem []byte, ok bool) {
529 return readi32(src)
530 }
531
532
533 func ReadCompressedCompressorID(src []byte) (id CompressorID, rem []byte, ok bool) {
534 if len(src) < 1 {
535 return 0, src, false
536 }
537 return CompressorID(src[0]), src[1:], true
538 }
539
540
541 func ReadCompressedCompressedMessage(src []byte, length int32) (msg []byte, rem []byte, ok bool) {
542 if len(src) < int(length) {
543 return nil, src, false
544 }
545 return src[:length], src[length:], true
546 }
547
548
549 func ReadKillCursorsZero(src []byte) (zero int32, rem []byte, ok bool) {
550 return readi32(src)
551 }
552
553
554 func ReadKillCursorsNumberIDs(src []byte) (numIDs int32, rem []byte, ok bool) {
555 return readi32(src)
556 }
557
558
559 func ReadKillCursorsCursorIDs(src []byte, numIDs int32) (cursorIDs []int64, rem []byte, ok bool) {
560 var i int32
561 var id int64
562 for i = 0; i < numIDs; i++ {
563 id, src, ok = readi64(src)
564 if !ok {
565 return cursorIDs, src, false
566 }
567
568 cursorIDs = append(cursorIDs, id)
569 }
570 return cursorIDs, src, true
571 }
572
573 func appendi32(dst []byte, i32 int32) []byte {
574 return append(dst, byte(i32), byte(i32>>8), byte(i32>>16), byte(i32>>24))
575 }
576
577 func appendi64(b []byte, i int64) []byte {
578 return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24), byte(i>>32), byte(i>>40), byte(i>>48), byte(i>>56))
579 }
580
581 func appendCString(b []byte, str string) []byte {
582 b = append(b, str...)
583 return append(b, 0x00)
584 }
585
586 func readi32(src []byte) (int32, []byte, bool) {
587 if len(src) < 4 {
588 return 0, src, false
589 }
590
591 return (int32(src[0]) | int32(src[1])<<8 | int32(src[2])<<16 | int32(src[3])<<24), src[4:], true
592 }
593
594 func readi32unsafe(src []byte) int32 {
595 return (int32(src[0]) | int32(src[1])<<8 | int32(src[2])<<16 | int32(src[3])<<24)
596 }
597
598 func readi64(src []byte) (int64, []byte, bool) {
599 if len(src) < 8 {
600 return 0, src, false
601 }
602 i64 := (int64(src[0]) | int64(src[1])<<8 | int64(src[2])<<16 | int64(src[3])<<24 |
603 int64(src[4])<<32 | int64(src[5])<<40 | int64(src[6])<<48 | int64(src[7])<<56)
604 return i64, src[8:], true
605 }
606
607 func readcstring(src []byte) (string, []byte, bool) {
608 idx := bytes.IndexByte(src, 0x00)
609 if idx < 0 {
610 return "", src, false
611 }
612 return string(src[:idx]), src[idx+1:], true
613 }
614
View as plain text