1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "net"
12 "os"
13 "runtime"
14 "testing"
15 "time"
16
17 "go.mongodb.org/mongo-driver/bson/primitive"
18 "go.mongodb.org/mongo-driver/event"
19 "go.mongodb.org/mongo-driver/internal/assert"
20 "go.mongodb.org/mongo-driver/internal/handshake"
21 "go.mongodb.org/mongo-driver/internal/require"
22 "go.mongodb.org/mongo-driver/mongo/address"
23 "go.mongodb.org/mongo-driver/mongo/description"
24 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
25 "go.mongodb.org/mongo-driver/mongo/options"
26 "go.mongodb.org/mongo-driver/x/mongo/driver/topology"
27 )
28
29 func TestSDAMProse(t *testing.T) {
30 mt := mtest.New(t)
31
32
33
34
35 heartbeatInterval := 500 * time.Millisecond
36 heartbeatIntervalClientOpts := options.Client().
37 SetHeartbeatInterval(heartbeatInterval)
38 heartbeatIntervalMtOpts := mtest.NewOptions().
39 ClientOptions(heartbeatIntervalClientOpts).
40 CreateCollection(false).
41 ClientType(mtest.Proxy).
42 MinServerVersion("4.4")
43 mt.RunOpts("heartbeats processed more frequently", heartbeatIntervalMtOpts, func(mt *mtest.T) {
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 if len(os.Getenv("DOCKER_RUNNING")) > 0 {
64 mt.Skip("skipping test in docker environment")
65 }
66 start := time.Now()
67 time.Sleep(2 * time.Second)
68 messages := mt.GetProxiedMessages()
69 duration := time.Since(start)
70
71 numNodes := len(options.Client().ApplyURI(mtest.ClusterURI()).Hosts)
72 maxExpected := numNodes * (2 + 2*int(duration/heartbeatInterval))
73 minExpected := numNodes * (2 + 2*int(duration/(heartbeatInterval*2)))
74
75 assert.True(
76 mt,
77 len(messages) >= minExpected && len(messages) <= maxExpected,
78 "expected number of messages to be in range [%d, %d], got %d"+
79 " (num nodes = %d, duration = %v, interval = %v)",
80 minExpected,
81 maxExpected,
82 len(messages),
83 numNodes,
84 duration,
85 heartbeatInterval)
86 })
87
88 mt.RunOpts("rtt tests", noClientOpts, func(mt *mtest.T) {
89 clientOpts := options.Client().
90 SetHeartbeatInterval(500 * time.Millisecond).
91 SetAppName("streamingRttTest")
92 mtOpts := mtest.NewOptions().
93 MinServerVersion("4.4").
94 ClientOptions(clientOpts)
95 mt.RunOpts("rtt is continuously updated", mtOpts, func(mt *mtest.T) {
96
97
98
99
100 testTopology := getTopologyFromClient(mt.Client)
101 time.Sleep(2 * time.Second)
102 for _, serverDesc := range testTopology.Description().Servers {
103 assert.NotEqual(mt, description.Unknown, serverDesc.Kind, "server %v is Unknown", serverDesc)
104 assert.True(mt, serverDesc.AverageRTTSet, "AverageRTTSet for server description %v is false", serverDesc)
105
106 if runtime.GOOS != "windows" {
107
108
109
110 assert.True(mt, serverDesc.AverageRTT > 0, "server description %v has 0 RTT", serverDesc)
111 }
112 }
113
114
115 mt.SetFailPoint(mtest.FailPoint{
116 ConfigureFailPoint: "failCommand",
117 Mode: mtest.FailPointMode{
118 Times: 1000,
119 },
120 Data: mtest.FailPointData{
121 FailCommands: []string{handshake.LegacyHello, "hello"},
122 BlockConnection: true,
123 BlockTimeMS: 500,
124 AppName: "streamingRttTest",
125 },
126 })
127 callback := func(ctx context.Context) {
128 for {
129
130 select {
131 case <-ctx.Done():
132 return
133 default:
134 }
135
136
137
138 for _, serverDesc := range testTopology.Description().Servers {
139 if serverDesc.AverageRTT > 250*time.Millisecond {
140 return
141 }
142 }
143
144
145 time.Sleep(500 * time.Millisecond)
146 }
147 }
148 assert.Soon(t, callback, defaultCallbackTimeout)
149 })
150 })
151
152 mt.RunOpts("client waits between failed Hellos", mtest.NewOptions().MinServerVersion("4.9").Topologies(mtest.Single), func(mt *mtest.T) {
153
154 mt.SetFailPoint(mtest.FailPoint{
155 ConfigureFailPoint: "failCommand",
156 Mode: mtest.FailPointMode{
157 Times: 5,
158 },
159 Data: mtest.FailPointData{
160 FailCommands: []string{handshake.LegacyHello, "hello"},
161 ErrorCode: 1234,
162 AppName: "SDAMMinHeartbeatFrequencyTest",
163 },
164 })
165
166
167 clientOpts := options.Client().SetDirect(true).
168 SetAppName("SDAMMinHeartbeatFrequencyTest").
169 SetServerSelectionTimeout(5 * time.Second)
170 mt.ResetClient(clientOpts)
171
172
173 start := time.Now()
174 err := mt.Client.Ping(context.Background(), nil)
175 assert.Nil(mt, err, "Ping error: %v", err)
176 pingTime := time.Since(start)
177 assert.True(mt, pingTime > 2000*time.Millisecond && pingTime < 3500*time.Millisecond,
178 "expected Ping to take between 2 and 3.5 seconds, took %v seconds", pingTime.Seconds())
179
180 })
181 }
182
183 func TestServerHeartbeatStartedEvent(t *testing.T) {
184 t.Run("emits the first HeartbeatStartedEvent before the monitoring socket was created", func(t *testing.T) {
185 t.Parallel()
186
187 const address = address.Address("localhost:9999")
188 expectedEvents := []string{
189 "serverHeartbeatStartedEvent",
190 "client connected",
191 "client hello received",
192 "serverHeartbeatFailedEvent",
193 }
194
195 events := make(chan string)
196
197 listener, err := net.Listen("tcp", address.String())
198 assert.NoError(t, err)
199 defer listener.Close()
200 go func() {
201 conn, err := listener.Accept()
202 assert.NoError(t, err)
203 defer conn.Close()
204
205 events <- "client connected"
206 _, _ = conn.Read(nil)
207 events <- "client hello received"
208 }()
209
210 server := topology.NewServer(
211 address,
212 primitive.NewObjectID(),
213 topology.WithServerMonitor(func(*event.ServerMonitor) *event.ServerMonitor {
214 return &event.ServerMonitor{
215 ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
216 events <- "serverHeartbeatStartedEvent"
217 },
218 ServerHeartbeatFailed: func(e *event.ServerHeartbeatFailedEvent) {
219 events <- "serverHeartbeatFailedEvent"
220 },
221 }
222 }),
223 )
224 require.NoError(t, server.Connect(nil))
225
226 ticker := time.NewTicker(5 * time.Second)
227 defer ticker.Stop()
228
229 actualEvents := make([]string, 0, len(expectedEvents))
230 for len(actualEvents) < len(expectedEvents) {
231 select {
232 case event := <-events:
233 actualEvents = append(actualEvents, event)
234 case <-ticker.C:
235 assert.FailNow(t, "timed out for incoming event")
236 }
237 }
238 assert.Equal(t, expectedEvents, actualEvents)
239 })
240 }
241
View as plain text