...
1
2
3
4
5
6
7 package mtest
8
9 import (
10 "context"
11 "errors"
12
13 "go.mongodb.org/mongo-driver/bson"
14 "go.mongodb.org/mongo-driver/internal/csot"
15 "go.mongodb.org/mongo-driver/mongo/address"
16 "go.mongodb.org/mongo-driver/mongo/description"
17 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
18 "go.mongodb.org/mongo-driver/x/mongo/driver"
19 "go.mongodb.org/mongo-driver/x/mongo/driver/topology"
20 "go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
21 )
22
23 const (
24 serverAddress = address.Address("127.0.0.1:27017")
25 maxDocumentSize uint32 = 16777216
26 maxMessageSize uint32 = 48000000
27 maxBatchCount uint32 = 100000
28 )
29
30 var (
31 sessionTimeoutMinutes uint32 = 30
32 sessionTimeoutMinutesInt64 = int64(sessionTimeoutMinutes)
33
34
35
36 MockDescription = description.Server{
37 CanonicalAddr: serverAddress,
38 MaxDocumentSize: maxDocumentSize,
39 MaxMessageSize: maxMessageSize,
40 MaxBatchCount: maxBatchCount,
41
42
43 SessionTimeoutMinutes: sessionTimeoutMinutes,
44 SessionTimeoutMinutesPtr: &sessionTimeoutMinutesInt64,
45 Kind: description.RSPrimary,
46 WireVersion: &description.VersionRange{
47 Max: topology.SupportedWireVersions.Max,
48 },
49 }
50 )
51
52
53 type connection struct {
54 responses []bson.D
55 }
56
57 var _ driver.Connection = &connection{}
58
59
60 func (c *connection) WriteWireMessage(context.Context, []byte) error {
61 return nil
62 }
63
64
65 func (c *connection) ReadWireMessage(_ context.Context) ([]byte, error) {
66 var dst []byte
67 if len(c.responses) == 0 {
68 return dst, errors.New("no responses remaining")
69 }
70 nextRes := c.responses[0]
71 c.responses = c.responses[1:]
72
73 var wmindex int32
74 wmindex, dst = wiremessage.AppendHeaderStart(dst, wiremessage.NextRequestID(), 0, wiremessage.OpMsg)
75 dst = wiremessage.AppendMsgFlags(dst, 0)
76 dst = wiremessage.AppendMsgSectionType(dst, wiremessage.SingleDocument)
77 resBytes, _ := bson.Marshal(nextRes)
78 dst = append(dst, resBytes...)
79 dst = bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:])))
80 return dst, nil
81 }
82
83
84 func (c *connection) Description() description.Server {
85 return MockDescription
86 }
87
88
89 func (*connection) Close() error {
90 return nil
91 }
92
93
94 func (*connection) ID() string {
95 return "<mock_connection>"
96 }
97
98
99
100 func (*connection) DriverConnectionID() uint64 {
101 return 0
102 }
103
104
105 func (*connection) ServerConnectionID() *int64 {
106 serverConnectionID := int64(42)
107 return &serverConnectionID
108 }
109
110
111 func (*connection) Address() address.Address {
112 return serverAddress
113 }
114
115
116 func (*connection) Stale() bool {
117 return false
118 }
119
120
121 type mockDeployment struct {
122 conn *connection
123 updates chan description.Topology
124 }
125
126 var _ driver.Deployment = &mockDeployment{}
127 var _ driver.Server = &mockDeployment{}
128 var _ driver.Connector = &mockDeployment{}
129 var _ driver.Disconnector = &mockDeployment{}
130 var _ driver.Subscriber = &mockDeployment{}
131
132
133
134
135 func (md *mockDeployment) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) {
136 return md, nil
137 }
138
139
140 func (md *mockDeployment) Kind() description.TopologyKind {
141 return description.Single
142 }
143
144
145 func (md *mockDeployment) Connection(context.Context) (driver.Connection, error) {
146 return md.conn, nil
147 }
148
149
150 func (md *mockDeployment) RTTMonitor() driver.RTTMonitor {
151 return &csot.ZeroRTTMonitor{}
152 }
153
154
155 func (md *mockDeployment) Connect() error {
156 return nil
157 }
158
159
160 func (md *mockDeployment) Disconnect(context.Context) error {
161 close(md.updates)
162 return nil
163 }
164
165
166
167 func (md *mockDeployment) Subscribe() (*driver.Subscription, error) {
168 if md.updates == nil {
169 md.updates = make(chan description.Topology, 1)
170
171 md.updates <- description.Topology{
172 SessionTimeoutMinutesPtr: &sessionTimeoutMinutesInt64,
173
174
175
176 SessionTimeoutMinutes: sessionTimeoutMinutes,
177 }
178 }
179
180 return &driver.Subscription{
181 Updates: md.updates,
182 }, nil
183 }
184
185
186 func (md *mockDeployment) Unsubscribe(*driver.Subscription) error {
187 return nil
188 }
189
190
191 func (md *mockDeployment) addResponses(responses ...bson.D) {
192 md.conn.responses = append(md.conn.responses, responses...)
193 }
194
195
196 func (md *mockDeployment) clearResponses() {
197 md.conn.responses = md.conn.responses[:0]
198 }
199
200
201 func newMockDeployment(responses ...bson.D) *mockDeployment {
202 return &mockDeployment{
203 conn: &connection{
204 responses: responses,
205 },
206 }
207 }
208
View as plain text