...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/mtest/sent_message.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/mtest

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package mtest
     8  
     9  import (
    10  	"errors"
    11  	"fmt"
    12  
    13  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    14  	"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
    15  )
    16  
    17  // SentMessage represents a message sent by the driver to the server.
    18  type SentMessage struct {
    19  	RequestID  int32
    20  	RawMessage wiremessage.WireMessage
    21  	Command    bsoncore.Document
    22  	OpCode     wiremessage.OpCode
    23  
    24  	// The $readPreference document. This is separated into its own field even though it's included in the larger
    25  	// command document in both OP_QUERY and OP_MSG because OP_QUERY separates the command into a $query sub-document
    26  	// if there is a read preference. To unify OP_QUERY and OP_MSG, we pull this out into a separate field and set
    27  	// the Command field to the $query sub-document.
    28  	ReadPreference bsoncore.Document
    29  
    30  	// The documents sent for an insert, update, or delete command. This is separated into its own field because it's
    31  	// sent as part of the command document in OP_QUERY and as a document sequence outside the command document in
    32  	// OP_MSG.
    33  	DocumentSequence *bsoncore.DocumentSequence
    34  }
    35  
    36  type sentMsgParseFn func([]byte) (*SentMessage, error)
    37  
    38  func getSentMessageParser(opcode wiremessage.OpCode) (sentMsgParseFn, bool) {
    39  	switch opcode {
    40  	case wiremessage.OpQuery:
    41  		return parseOpQuery, true
    42  	case wiremessage.OpMsg:
    43  		return parseSentOpMsg, true
    44  	case wiremessage.OpCompressed:
    45  		return parseSentOpCompressed, true
    46  	default:
    47  		return nil, false
    48  	}
    49  }
    50  
    51  func parseOpQuery(wm []byte) (*SentMessage, error) {
    52  	var ok bool
    53  
    54  	if _, wm, ok = wiremessage.ReadQueryFlags(wm); !ok {
    55  		return nil, errors.New("failed to read query flags")
    56  	}
    57  	if _, wm, ok = wiremessage.ReadQueryFullCollectionName(wm); !ok {
    58  		return nil, errors.New("failed to read full collection name")
    59  	}
    60  	if _, wm, ok = wiremessage.ReadQueryNumberToSkip(wm); !ok {
    61  		return nil, errors.New("failed to read number to skip")
    62  	}
    63  	if _, wm, ok = wiremessage.ReadQueryNumberToReturn(wm); !ok {
    64  		return nil, errors.New("failed to read number to return")
    65  	}
    66  
    67  	query, wm, ok := wiremessage.ReadQueryQuery(wm)
    68  	if !ok {
    69  		return nil, errors.New("failed to read query")
    70  	}
    71  
    72  	// If there is no read preference document, the command document is query.
    73  	// Otherwise, query is in the format {$query: <command document>, $readPreference: <read preference document>}.
    74  	commandDoc := query
    75  	var rpDoc bsoncore.Document
    76  
    77  	dollarQueryVal, err := query.LookupErr("$query")
    78  	if err == nil {
    79  		commandDoc = dollarQueryVal.Document()
    80  
    81  		rpVal, err := query.LookupErr("$readPreference")
    82  		if err != nil {
    83  			return nil, fmt.Errorf("query %s contains $query but not $readPreference fields", query)
    84  		}
    85  		rpDoc = rpVal.Document()
    86  	}
    87  
    88  	// For OP_QUERY, inserts, updates, and deletes are sent as a BSON array of documents inside the main command
    89  	// document. Pull these sequences out into an ArrayStyle DocumentSequence.
    90  	var docSequence *bsoncore.DocumentSequence
    91  	cmdElems, _ := commandDoc.Elements()
    92  	for _, elem := range cmdElems {
    93  		switch elem.Key() {
    94  		case "documents", "updates", "deletes":
    95  			docSequence = &bsoncore.DocumentSequence{
    96  				Style: bsoncore.ArrayStyle,
    97  				Data:  elem.Value().Array(),
    98  			}
    99  		}
   100  		if docSequence != nil {
   101  			// There can only be one of these arrays in a well-formed command, so we exit the loop once one is found.
   102  			break
   103  		}
   104  	}
   105  
   106  	sm := &SentMessage{
   107  		Command:          commandDoc,
   108  		ReadPreference:   rpDoc,
   109  		DocumentSequence: docSequence,
   110  	}
   111  	return sm, nil
   112  }
   113  
   114  func parseSentMessage(wm []byte) (*SentMessage, error) {
   115  	// Re-assign the wire message to "remaining" so "wm" continues to point to the entire message after parsing.
   116  	_, requestID, _, opcode, remaining, ok := wiremessage.ReadHeader(wm)
   117  	if !ok {
   118  		return nil, errors.New("failed to read wiremessage header")
   119  	}
   120  
   121  	parseFn, ok := getSentMessageParser(opcode)
   122  	if !ok {
   123  		return nil, fmt.Errorf("unknown opcode: %v", opcode)
   124  	}
   125  	sent, err := parseFn(remaining)
   126  	if err != nil {
   127  		return nil, fmt.Errorf("error parsing wiremessage with opcode %s: %w", opcode, err)
   128  	}
   129  
   130  	sent.RequestID = requestID
   131  	sent.RawMessage = wm
   132  	sent.OpCode = opcode
   133  	return sent, nil
   134  }
   135  
   136  func parseSentOpMsg(wm []byte) (*SentMessage, error) {
   137  	var ok bool
   138  	var err error
   139  
   140  	if _, wm, ok = wiremessage.ReadMsgFlags(wm); !ok {
   141  		return nil, errors.New("failed to read flags")
   142  	}
   143  
   144  	if wm, err = assertMsgSectionType(wm, wiremessage.SingleDocument); err != nil {
   145  		return nil, fmt.Errorf("error verifying section type for command document: %w", err)
   146  	}
   147  
   148  	var commandDoc bsoncore.Document
   149  	commandDoc, wm, ok = wiremessage.ReadMsgSectionSingleDocument(wm)
   150  	if !ok {
   151  		return nil, errors.New("failed to read command document")
   152  	}
   153  
   154  	var rpDoc bsoncore.Document
   155  	if rpVal, err := commandDoc.LookupErr("$readPreference"); err == nil {
   156  		rpDoc = rpVal.Document()
   157  	}
   158  
   159  	var docSequence *bsoncore.DocumentSequence
   160  	if len(wm) != 0 {
   161  		// If there are bytes remaining in the wire message, they must correspond to a DocumentSequence section.
   162  		if wm, err = assertMsgSectionType(wm, wiremessage.DocumentSequence); err != nil {
   163  			return nil, fmt.Errorf("error verifying section type for document sequence: %w", err)
   164  		}
   165  
   166  		var data []byte
   167  		_, data, wm, ok = wiremessage.ReadMsgSectionRawDocumentSequence(wm)
   168  		if !ok {
   169  			return nil, errors.New("failed to read document sequence")
   170  		}
   171  
   172  		docSequence = &bsoncore.DocumentSequence{
   173  			Style: bsoncore.SequenceStyle,
   174  			Data:  data,
   175  		}
   176  	}
   177  
   178  	sm := &SentMessage{
   179  		Command:          commandDoc,
   180  		ReadPreference:   rpDoc,
   181  		DocumentSequence: docSequence,
   182  	}
   183  	return sm, nil
   184  }
   185  
   186  func parseSentOpCompressed(wm []byte) (*SentMessage, error) {
   187  	originalOpcode, wm, err := parseOpCompressed(wm)
   188  	if err != nil {
   189  		return nil, err
   190  	}
   191  
   192  	parser, ok := getSentMessageParser(originalOpcode)
   193  	if !ok {
   194  		return nil, fmt.Errorf("unknown original opcode %v", originalOpcode)
   195  	}
   196  	return parser(wm)
   197  }
   198  

View as plain text