...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/mtest/opmsg_deployment.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/mtest

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package mtest
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  
    13  	"go.mongodb.org/mongo-driver/bson"
    14  	"go.mongodb.org/mongo-driver/internal/csot"
    15  	"go.mongodb.org/mongo-driver/mongo/address"
    16  	"go.mongodb.org/mongo-driver/mongo/description"
    17  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    18  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    19  	"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
    20  	"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
    21  )
    22  
    23  const (
    24  	serverAddress          = address.Address("127.0.0.1:27017")
    25  	maxDocumentSize uint32 = 16777216
    26  	maxMessageSize  uint32 = 48000000
    27  	maxBatchCount   uint32 = 100000
    28  )
    29  
    30  var (
    31  	sessionTimeoutMinutes      uint32 = 30
    32  	sessionTimeoutMinutesInt64        = int64(sessionTimeoutMinutes)
    33  
    34  	// MockDescription is the server description used for the mock deployment. Each mocked connection returns this
    35  	// value from its Description method.
    36  	MockDescription = description.Server{
    37  		CanonicalAddr:   serverAddress,
    38  		MaxDocumentSize: maxDocumentSize,
    39  		MaxMessageSize:  maxMessageSize,
    40  		MaxBatchCount:   maxBatchCount,
    41  		// TODO(GODRIVER-2885): This can be removed once legacy
    42  		// SessionTimeoutMinutes is removed.
    43  		SessionTimeoutMinutes:    sessionTimeoutMinutes,
    44  		SessionTimeoutMinutesPtr: &sessionTimeoutMinutesInt64,
    45  		Kind:                     description.RSPrimary,
    46  		WireVersion: &description.VersionRange{
    47  			Max: topology.SupportedWireVersions.Max,
    48  		},
    49  	}
    50  )
    51  
    52  // connection implements the driver.Connection interface and responds to wire messages with pre-configured responses.
    53  type connection struct {
    54  	responses []bson.D // responses to send when ReadWireMessage is called
    55  }
    56  
    57  var _ driver.Connection = &connection{}
    58  
    59  // WriteWireMessage is a no-op.
    60  func (c *connection) WriteWireMessage(context.Context, []byte) error {
    61  	return nil
    62  }
    63  
    64  // ReadWireMessage returns the next response in the connection's list of responses.
    65  func (c *connection) ReadWireMessage(_ context.Context) ([]byte, error) {
    66  	var dst []byte
    67  	if len(c.responses) == 0 {
    68  		return dst, errors.New("no responses remaining")
    69  	}
    70  	nextRes := c.responses[0]
    71  	c.responses = c.responses[1:]
    72  
    73  	var wmindex int32
    74  	wmindex, dst = wiremessage.AppendHeaderStart(dst, wiremessage.NextRequestID(), 0, wiremessage.OpMsg)
    75  	dst = wiremessage.AppendMsgFlags(dst, 0)
    76  	dst = wiremessage.AppendMsgSectionType(dst, wiremessage.SingleDocument)
    77  	resBytes, _ := bson.Marshal(nextRes)
    78  	dst = append(dst, resBytes...)
    79  	dst = bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:])))
    80  	return dst, nil
    81  }
    82  
    83  // Description returns a fixed server description for the connection.
    84  func (c *connection) Description() description.Server {
    85  	return MockDescription
    86  }
    87  
    88  // Close is a no-op operation.
    89  func (*connection) Close() error {
    90  	return nil
    91  }
    92  
    93  // ID returns a fixed identifier for the connection.
    94  func (*connection) ID() string {
    95  	return "<mock_connection>"
    96  }
    97  
    98  // DriverConnectionID returns a fixed identifier for the driver pool connection.
    99  // TODO(GODRIVER-2824): replace return type with int64.
   100  func (*connection) DriverConnectionID() uint64 {
   101  	return 0
   102  }
   103  
   104  // ServerConnectionID returns a fixed identifier for the server connection.
   105  func (*connection) ServerConnectionID() *int64 {
   106  	serverConnectionID := int64(42)
   107  	return &serverConnectionID
   108  }
   109  
   110  // Address returns a fixed address for the connection.
   111  func (*connection) Address() address.Address {
   112  	return serverAddress
   113  }
   114  
   115  // Stale returns if the connection is stale.
   116  func (*connection) Stale() bool {
   117  	return false
   118  }
   119  
   120  // mockDeployment wraps a connection and implements the driver.Deployment interface.
   121  type mockDeployment struct {
   122  	conn    *connection
   123  	updates chan description.Topology
   124  }
   125  
   126  var _ driver.Deployment = &mockDeployment{}
   127  var _ driver.Server = &mockDeployment{}
   128  var _ driver.Connector = &mockDeployment{}
   129  var _ driver.Disconnector = &mockDeployment{}
   130  var _ driver.Subscriber = &mockDeployment{}
   131  
   132  // SelectServer implements the Deployment interface. This method does not use the
   133  // description.SelectedServer provided and instead returns itself. The Connections returned from the
   134  // Connection method have a no-op Close method.
   135  func (md *mockDeployment) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) {
   136  	return md, nil
   137  }
   138  
   139  // Kind implements the Deployment interface. It always returns description.Single.
   140  func (md *mockDeployment) Kind() description.TopologyKind {
   141  	return description.Single
   142  }
   143  
   144  // Connection implements the driver.Server interface.
   145  func (md *mockDeployment) Connection(context.Context) (driver.Connection, error) {
   146  	return md.conn, nil
   147  }
   148  
   149  // RTTMonitor implements the driver.Server interface.
   150  func (md *mockDeployment) RTTMonitor() driver.RTTMonitor {
   151  	return &csot.ZeroRTTMonitor{}
   152  }
   153  
   154  // Connect is a no-op method which implements the driver.Connector interface.
   155  func (md *mockDeployment) Connect() error {
   156  	return nil
   157  }
   158  
   159  // Disconnect is a no-op method which implements the driver.Disconnector interface {
   160  func (md *mockDeployment) Disconnect(context.Context) error {
   161  	close(md.updates)
   162  	return nil
   163  }
   164  
   165  // Subscribe returns a subscription from which new topology descriptions can be retrieved.
   166  // Subscribe implements the driver.Subscriber interface.
   167  func (md *mockDeployment) Subscribe() (*driver.Subscription, error) {
   168  	if md.updates == nil {
   169  		md.updates = make(chan description.Topology, 1)
   170  
   171  		md.updates <- description.Topology{
   172  			SessionTimeoutMinutesPtr: &sessionTimeoutMinutesInt64,
   173  
   174  			// TODO(GODRIVER-2885): This can be removed once legacy
   175  			// SessionTimeoutMinutes is removed.
   176  			SessionTimeoutMinutes: sessionTimeoutMinutes,
   177  		}
   178  	}
   179  
   180  	return &driver.Subscription{
   181  		Updates: md.updates,
   182  	}, nil
   183  }
   184  
   185  // Unsubscribe is a no-op method which implements the driver.Subscriber interface.
   186  func (md *mockDeployment) Unsubscribe(*driver.Subscription) error {
   187  	return nil
   188  }
   189  
   190  // addResponses adds responses to this mock deployment.
   191  func (md *mockDeployment) addResponses(responses ...bson.D) {
   192  	md.conn.responses = append(md.conn.responses, responses...)
   193  }
   194  
   195  // clearResponses clears all remaining responses in this mock deployment.
   196  func (md *mockDeployment) clearResponses() {
   197  	md.conn.responses = md.conn.responses[:0]
   198  }
   199  
   200  // newMockDeployment returns a mock driver.Deployment that responds with OP_MSG wire messages.
   201  func newMockDeployment(responses ...bson.D) *mockDeployment {
   202  	return &mockDeployment{
   203  		conn: &connection{
   204  			responses: responses,
   205  		},
   206  	}
   207  }
   208  

View as plain text