1
2
3
4
5
6
7 package description
8
9 import (
10 "encoding/json"
11 "io/ioutil"
12 "path"
13 "strconv"
14 "testing"
15 "time"
16
17 "go.mongodb.org/mongo-driver/internal/require"
18 "go.mongodb.org/mongo-driver/mongo/address"
19 "go.mongodb.org/mongo-driver/mongo/readpref"
20 "go.mongodb.org/mongo-driver/tag"
21 )
22
23 type testCase struct {
24 TopologyDescription topDesc `json:"topology_description"`
25 Operation string `json:"operation"`
26 ReadPreference readPref `json:"read_preference"`
27 SuitableServers []*serverDesc `json:"suitable_servers"`
28 InLatencyWindow []*serverDesc `json:"in_latency_window"`
29 HeartbeatFrequencyMS *int `json:"heartbeatFrequencyMS"`
30 Error *bool
31 }
32
33 type topDesc struct {
34 Type string `json:"type"`
35 Servers []*serverDesc `json:"servers"`
36 }
37
38 type serverDesc struct {
39 Address string `json:"address"`
40 AverageRTTMS *int `json:"avg_rtt_ms"`
41 MaxWireVersion *int32 `json:"maxWireVersion"`
42 LastUpdateTime *int `json:"lastUpdateTime"`
43 LastWrite *lastWriteDate `json:"lastWrite"`
44 Type string `json:"type"`
45 Tags map[string]string `json:"tags"`
46 }
47
48 type lastWriteDate struct {
49 LastWriteDate lastWriteDateInner `json:"lastWriteDate"`
50 }
51
52
53 type lastWriteDateInner struct {
54 Value string `json:"$numberLong"`
55 }
56
57 type readPref struct {
58 MaxStaleness *int `json:"maxStalenessSeconds"`
59 Mode string `json:"mode"`
60 TagSets []map[string]string `json:"tag_sets"`
61 }
62
63 func topologyKindFromString(t *testing.T, s string) TopologyKind {
64 t.Helper()
65
66 switch s {
67 case "Single":
68 return Single
69 case "ReplicaSet":
70 return ReplicaSet
71 case "ReplicaSetNoPrimary":
72 return ReplicaSetNoPrimary
73 case "ReplicaSetWithPrimary":
74 return ReplicaSetWithPrimary
75 case "Sharded":
76 return Sharded
77 case "LoadBalanced":
78 return LoadBalanced
79 case "Unknown":
80 return Unknown
81 default:
82 t.Fatalf("unrecognized topology kind: %q", s)
83 }
84
85 return Unknown
86 }
87
88 func serverKindFromString(t *testing.T, s string) ServerKind {
89 t.Helper()
90
91 switch s {
92 case "Standalone":
93 return Standalone
94 case "RSOther":
95 return RSMember
96 case "RSPrimary":
97 return RSPrimary
98 case "RSSecondary":
99 return RSSecondary
100 case "RSArbiter":
101 return RSArbiter
102 case "RSGhost":
103 return RSGhost
104 case "Mongos":
105 return Mongos
106 case "LoadBalancer":
107 return LoadBalancer
108 case "PossiblePrimary", "Unknown":
109
110 return Unknown
111 default:
112 t.Fatalf("unrecognized server kind: %q", s)
113 }
114
115 return Unknown
116 }
117
118 func findServerByAddress(servers []Server, address string) Server {
119 for _, server := range servers {
120 if server.Addr.String() == address {
121 return server
122 }
123 }
124
125 return Server{}
126 }
127
128 func anyTagsInSets(sets []tag.Set) bool {
129 for _, set := range sets {
130 if len(set) > 0 {
131 return true
132 }
133 }
134
135 return false
136 }
137
138 func compareServers(t *testing.T, expected []*serverDesc, actual []Server) {
139 require.Equal(t, len(expected), len(actual))
140
141 for _, expectedServer := range expected {
142 actualServer := findServerByAddress(actual, expectedServer.Address)
143 require.NotNil(t, actualServer)
144
145 if expectedServer.AverageRTTMS != nil {
146 require.Equal(t, *expectedServer.AverageRTTMS, int(actualServer.AverageRTT/time.Millisecond))
147 }
148
149 require.Equal(t, expectedServer.Type, actualServer.Kind.String())
150
151 require.Equal(t, len(expectedServer.Tags), len(actualServer.Tags))
152 for _, actualTag := range actualServer.Tags {
153 expectedTag, ok := expectedServer.Tags[actualTag.Name]
154 require.True(t, ok)
155 require.Equal(t, expectedTag, actualTag.Value)
156 }
157 }
158 }
159
160 func selectServers(t *testing.T, test *testCase) error {
161 servers := make([]Server, 0, len(test.TopologyDescription.Servers))
162
163
164
165
166 baseTime := time.Now()
167
168 for _, serverDescription := range test.TopologyDescription.Servers {
169 server := Server{
170 Addr: address.Address(serverDescription.Address),
171 Kind: serverKindFromString(t, serverDescription.Type),
172 }
173
174 if serverDescription.AverageRTTMS != nil {
175 server.AverageRTT = time.Duration(*serverDescription.AverageRTTMS) * time.Millisecond
176 server.AverageRTTSet = true
177 }
178
179 if test.HeartbeatFrequencyMS != nil {
180 server.HeartbeatInterval = time.Duration(*test.HeartbeatFrequencyMS) * time.Millisecond
181 }
182
183 if serverDescription.LastUpdateTime != nil {
184 ms := int64(*serverDescription.LastUpdateTime)
185 server.LastUpdateTime = time.Unix(ms/1e3, ms%1e3/1e6)
186 }
187
188 if serverDescription.LastWrite != nil {
189 i, err := strconv.ParseInt(serverDescription.LastWrite.LastWriteDate.Value, 10, 64)
190
191 if err != nil {
192 return err
193 }
194
195 timeWithOffset := baseTime.Add(time.Duration(i) * time.Millisecond)
196 server.LastWriteTime = timeWithOffset
197 }
198
199 if serverDescription.MaxWireVersion != nil {
200 versionRange := NewVersionRange(0, *serverDescription.MaxWireVersion)
201 server.WireVersion = &versionRange
202 }
203
204 if serverDescription.Tags != nil {
205 server.Tags = tag.NewTagSetFromMap(serverDescription.Tags)
206 }
207
208 if test.ReadPreference.MaxStaleness != nil && server.WireVersion == nil {
209 server.WireVersion = &VersionRange{Max: 21}
210 }
211
212 servers = append(servers, server)
213 }
214
215 c := Topology{
216 Kind: topologyKindFromString(t, test.TopologyDescription.Type),
217 Servers: servers,
218 }
219
220 if len(test.ReadPreference.Mode) == 0 {
221 test.ReadPreference.Mode = "Primary"
222 }
223
224 readprefMode, err := readpref.ModeFromString(test.ReadPreference.Mode)
225 if err != nil {
226 return err
227 }
228
229 options := make([]readpref.Option, 0, 1)
230
231 tagSets := tag.NewTagSetsFromMaps(test.ReadPreference.TagSets)
232 if anyTagsInSets(tagSets) {
233 options = append(options, readpref.WithTagSets(tagSets...))
234 }
235
236 if test.ReadPreference.MaxStaleness != nil {
237 s := time.Duration(*test.ReadPreference.MaxStaleness) * time.Second
238 options = append(options, readpref.WithMaxStaleness(s))
239 }
240
241 rp, err := readpref.New(readprefMode, options...)
242 if err != nil {
243 return err
244 }
245
246 selector := ReadPrefSelector(rp)
247 if test.Operation == "write" {
248 selector = CompositeSelector(
249 []ServerSelector{WriteSelector(), selector},
250 )
251 }
252
253 result, err := selector.SelectServer(c, c.Servers)
254 if err != nil {
255 return err
256 }
257
258 compareServers(t, test.SuitableServers, result)
259
260 latencySelector := LatencySelector(time.Duration(15) * time.Millisecond)
261 selector = CompositeSelector(
262 []ServerSelector{selector, latencySelector},
263 )
264
265 result, err = selector.SelectServer(c, c.Servers)
266 if err != nil {
267 return err
268 }
269
270 compareServers(t, test.InLatencyWindow, result)
271
272 return nil
273 }
274
275 func runTest(t *testing.T, testsDir string, directory string, filename string) {
276 filepath := path.Join(testsDir, directory, filename)
277 content, err := ioutil.ReadFile(filepath)
278 require.NoError(t, err)
279
280
281 filename = filename[:len(filename)-5]
282 testName := directory + "/" + filename + ":"
283
284 t.Run(testName, func(t *testing.T) {
285 var test testCase
286 require.NoError(t, json.Unmarshal(content, &test))
287
288 err := selectServers(t, &test)
289
290 if test.Error == nil || !*test.Error {
291 require.NoError(t, err)
292 } else {
293 require.Error(t, err)
294 }
295 })
296 }
297
View as plain text