1
2
3
4
5
6
7 package description
8
9 import (
10 "testing"
11 "time"
12
13 "github.com/google/go-cmp/cmp"
14 "go.mongodb.org/mongo-driver/internal/assert"
15 "go.mongodb.org/mongo-driver/internal/require"
16 "go.mongodb.org/mongo-driver/mongo/address"
17 "go.mongodb.org/mongo-driver/mongo/readpref"
18 "go.mongodb.org/mongo-driver/tag"
19 )
20
21 func TestServerSelection(t *testing.T) {
22 noerr := func(t *testing.T, err error) {
23 if err != nil {
24 t.Errorf("Unepexted error: %v", err)
25 t.FailNow()
26 }
27 }
28
29 t.Run("WriteSelector", func(t *testing.T) {
30 testCases := []struct {
31 name string
32 desc Topology
33 start int
34 end int
35 }{
36 {
37 name: "ReplicaSetWithPrimary",
38 desc: Topology{
39 Kind: ReplicaSetWithPrimary,
40 Servers: []Server{
41 {Addr: address.Address("localhost:27017"), Kind: RSPrimary},
42 {Addr: address.Address("localhost:27018"), Kind: RSSecondary},
43 {Addr: address.Address("localhost:27019"), Kind: RSSecondary},
44 },
45 },
46 start: 0,
47 end: 1,
48 },
49 {
50 name: "ReplicaSetNoPrimary",
51 desc: Topology{
52 Kind: ReplicaSetNoPrimary,
53 Servers: []Server{
54 {Addr: address.Address("localhost:27018"), Kind: RSSecondary},
55 {Addr: address.Address("localhost:27019"), Kind: RSSecondary},
56 },
57 },
58 start: 0,
59 end: 0,
60 },
61 {
62 name: "Sharded",
63 desc: Topology{
64 Kind: Sharded,
65 Servers: []Server{
66 {Addr: address.Address("localhost:27018"), Kind: Mongos},
67 {Addr: address.Address("localhost:27019"), Kind: Mongos},
68 },
69 },
70 start: 0,
71 end: 2,
72 },
73 {
74 name: "Single",
75 desc: Topology{
76 Kind: Single,
77 Servers: []Server{
78 {Addr: address.Address("localhost:27018"), Kind: Standalone},
79 },
80 },
81 start: 0,
82 end: 1,
83 },
84 }
85
86 for _, tc := range testCases {
87 t.Run(tc.name, func(t *testing.T) {
88 result, err := WriteSelector().SelectServer(tc.desc, tc.desc.Servers)
89 noerr(t, err)
90 if len(result) != tc.end-tc.start {
91 t.Errorf("Incorrect number of servers selected. got %d; want %d", len(result), tc.end-tc.start)
92 }
93 if diff := cmp.Diff(result, tc.desc.Servers[tc.start:tc.end]); diff != "" {
94 t.Errorf("Incorrect servers selected (-got +want):\n%s", diff)
95 }
96 })
97 }
98 })
99 t.Run("LatencySelector", func(t *testing.T) {
100 testCases := []struct {
101 name string
102 desc Topology
103 start int
104 end int
105 }{
106 {
107 name: "NoRTTSet",
108 desc: Topology{
109 Servers: []Server{
110 {Addr: address.Address("localhost:27017")},
111 {Addr: address.Address("localhost:27018")},
112 {Addr: address.Address("localhost:27019")},
113 },
114 },
115 start: 0,
116 end: 3,
117 },
118 {
119 name: "MultipleServers PartialNoRTTSet",
120 desc: Topology{
121 Servers: []Server{
122 {Addr: address.Address("localhost:27017"), AverageRTT: 5 * time.Second, AverageRTTSet: true},
123 {Addr: address.Address("localhost:27018"), AverageRTT: 10 * time.Second, AverageRTTSet: true},
124 {Addr: address.Address("localhost:27019")},
125 },
126 },
127 start: 0,
128 end: 2,
129 },
130 {
131 name: "MultipleServers",
132 desc: Topology{
133 Servers: []Server{
134 {Addr: address.Address("localhost:27017"), AverageRTT: 5 * time.Second, AverageRTTSet: true},
135 {Addr: address.Address("localhost:27018"), AverageRTT: 10 * time.Second, AverageRTTSet: true},
136 {Addr: address.Address("localhost:27019"), AverageRTT: 26 * time.Second, AverageRTTSet: true},
137 },
138 },
139 start: 0,
140 end: 2,
141 },
142 {
143 name: "No Servers",
144 desc: Topology{Servers: []Server{}},
145 start: 0,
146 end: 0,
147 },
148 {
149 name: "1 Server",
150 desc: Topology{
151 Servers: []Server{
152 {Addr: address.Address("localhost:27017"), AverageRTT: 26 * time.Second, AverageRTTSet: true},
153 },
154 },
155 start: 0,
156 end: 1,
157 },
158 }
159
160 for _, tc := range testCases {
161 t.Run(tc.name, func(t *testing.T) {
162 result, err := LatencySelector(20*time.Second).SelectServer(tc.desc, tc.desc.Servers)
163 noerr(t, err)
164 if len(result) != tc.end-tc.start {
165 t.Errorf("Incorrect number of servers selected. got %d; want %d", len(result), tc.end-tc.start)
166 }
167 if diff := cmp.Diff(result, tc.desc.Servers[tc.start:tc.end]); diff != "" {
168 t.Errorf("Incorrect servers selected (-got +want):\n%s", diff)
169 }
170 })
171 }
172 })
173 }
174
175 var readPrefTestPrimary = Server{
176 Addr: address.Address("localhost:27017"),
177 HeartbeatInterval: time.Duration(10) * time.Second,
178 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
179 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
180 Kind: RSPrimary,
181 Tags: tag.Set{tag.Tag{Name: "a", Value: "1"}},
182 WireVersion: &VersionRange{Min: 6, Max: 21},
183 }
184 var readPrefTestSecondary1 = Server{
185 Addr: address.Address("localhost:27018"),
186 HeartbeatInterval: time.Duration(10) * time.Second,
187 LastWriteTime: time.Date(2017, 2, 11, 13, 58, 0, 0, time.UTC),
188 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
189 Kind: RSSecondary,
190 Tags: tag.Set{tag.Tag{Name: "a", Value: "1"}},
191 WireVersion: &VersionRange{Min: 6, Max: 21},
192 }
193 var readPrefTestSecondary2 = Server{
194 Addr: address.Address("localhost:27018"),
195 HeartbeatInterval: time.Duration(10) * time.Second,
196 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
197 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
198 Kind: RSSecondary,
199 Tags: tag.Set{tag.Tag{Name: "a", Value: "2"}},
200 WireVersion: &VersionRange{Min: 6, Max: 21},
201 }
202 var readPrefTestTopology = Topology{
203 Kind: ReplicaSetWithPrimary,
204 Servers: []Server{readPrefTestPrimary, readPrefTestSecondary1, readPrefTestSecondary2},
205 }
206
207 func TestSelector_Sharded(t *testing.T) {
208 t.Parallel()
209
210 subject := readpref.Primary()
211
212 s := Server{
213 Addr: address.Address("localhost:27017"),
214 HeartbeatInterval: time.Duration(10) * time.Second,
215 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
216 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
217 Kind: Mongos,
218 WireVersion: &VersionRange{Min: 6, Max: 21},
219 }
220 c := Topology{
221 Kind: Sharded,
222 Servers: []Server{s},
223 }
224
225 result, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
226
227 require.NoError(t, err)
228 require.Len(t, result, 1)
229 require.Equal(t, []Server{s}, result)
230 }
231
232 func BenchmarkLatencySelector(b *testing.B) {
233 for _, bcase := range []struct {
234 name string
235 serversHook func(servers []Server)
236 }{
237 {
238 name: "AllFit",
239 serversHook: func(servers []Server) {},
240 },
241 {
242 name: "AllButOneFit",
243 serversHook: func(servers []Server) {
244 servers[0].AverageRTT = 2 * time.Second
245 },
246 },
247 {
248 name: "HalfFit",
249 serversHook: func(servers []Server) {
250 for i := 0; i < len(servers); i += 2 {
251 servers[i].AverageRTT = 2 * time.Second
252 }
253 },
254 },
255 {
256 name: "OneFit",
257 serversHook: func(servers []Server) {
258 for i := 1; i < len(servers); i++ {
259 servers[i].AverageRTT = 2 * time.Second
260 }
261 },
262 },
263 } {
264 bcase := bcase
265
266 b.Run(bcase.name, func(b *testing.B) {
267 s := Server{
268 Addr: address.Address("localhost:27017"),
269 HeartbeatInterval: time.Duration(10) * time.Second,
270 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
271 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
272 Kind: Mongos,
273 WireVersion: &VersionRange{Min: 6, Max: 21},
274 AverageRTTSet: true,
275 AverageRTT: time.Second,
276 }
277 servers := make([]Server, 100)
278 for i := 0; i < len(servers); i++ {
279 servers[i] = s
280 }
281 bcase.serversHook(servers)
282
283
284 servers[99].AverageRTT = 500 * time.Millisecond
285 c := Topology{
286 Kind: Sharded,
287 Servers: servers,
288 }
289
290 b.ResetTimer()
291 b.RunParallel(func(p *testing.PB) {
292 b.ReportAllocs()
293 for p.Next() {
294 _, _ = LatencySelector(time.Second).SelectServer(c, c.Servers)
295 }
296 })
297 })
298 }
299 }
300
301 func BenchmarkSelector_Sharded(b *testing.B) {
302 for _, bcase := range []struct {
303 name string
304 serversHook func(servers []Server)
305 }{
306 {
307 name: "AllFit",
308 serversHook: func(servers []Server) {},
309 },
310 {
311 name: "AllButOneFit",
312 serversHook: func(servers []Server) {
313 servers[0].Kind = LoadBalancer
314 },
315 },
316 {
317 name: "HalfFit",
318 serversHook: func(servers []Server) {
319 for i := 0; i < len(servers); i += 2 {
320 servers[i].Kind = LoadBalancer
321 }
322 },
323 },
324 {
325 name: "OneFit",
326 serversHook: func(servers []Server) {
327 for i := 1; i < len(servers); i++ {
328 servers[i].Kind = LoadBalancer
329 }
330 },
331 },
332 } {
333 bcase := bcase
334
335 b.Run(bcase.name, func(b *testing.B) {
336 subject := readpref.Primary()
337
338 s := Server{
339 Addr: address.Address("localhost:27017"),
340 HeartbeatInterval: time.Duration(10) * time.Second,
341 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
342 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
343 Kind: Mongos,
344 WireVersion: &VersionRange{Min: 6, Max: 21},
345 }
346 servers := make([]Server, 100)
347 for i := 0; i < len(servers); i++ {
348 servers[i] = s
349 }
350 bcase.serversHook(servers)
351 c := Topology{
352 Kind: Sharded,
353 Servers: servers,
354 }
355
356 b.ResetTimer()
357 b.RunParallel(func(p *testing.PB) {
358 b.ReportAllocs()
359 for p.Next() {
360 _, _ = ReadPrefSelector(subject).SelectServer(c, c.Servers)
361 }
362 })
363 })
364 }
365 }
366
367 func Benchmark_SelectServer_SelectServer(b *testing.B) {
368 topology := Topology{Kind: ReplicaSet}
369 candidates := []Server{
370 {Kind: Mongos},
371 {Kind: RSPrimary},
372 {Kind: Standalone},
373 }
374
375 selector := writeServerSelector{}
376
377 b.ReportAllocs()
378 b.ResetTimer()
379
380 for i := 0; i < b.N; i++ {
381 _, err := selector.SelectServer(topology, candidates)
382 if err != nil {
383 b.Fatalf("Error selecting server: %v", err)
384 }
385 }
386 }
387
388 func TestSelector_Single(t *testing.T) {
389 t.Parallel()
390
391 subject := readpref.Primary()
392
393 s := Server{
394 Addr: address.Address("localhost:27017"),
395 HeartbeatInterval: time.Duration(10) * time.Second,
396 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
397 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
398 Kind: Mongos,
399 WireVersion: &VersionRange{Min: 6, Max: 21},
400 }
401 c := Topology{
402 Kind: Single,
403 Servers: []Server{s},
404 }
405
406 result, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
407
408 require.NoError(t, err)
409 require.Len(t, result, 1)
410 require.Equal(t, []Server{s}, result)
411 }
412
413 func TestSelector_Primary(t *testing.T) {
414 t.Parallel()
415
416 subject := readpref.Primary()
417
418 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
419
420 require.NoError(t, err)
421 require.Len(t, result, 1)
422 require.Equal(t, []Server{readPrefTestPrimary}, result)
423 }
424
425 func TestSelector_Primary_with_no_primary(t *testing.T) {
426 t.Parallel()
427
428 subject := readpref.Primary()
429
430 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
431
432 require.NoError(t, err)
433 require.Len(t, result, 0)
434 }
435
436 func TestSelector_PrimaryPreferred(t *testing.T) {
437 t.Parallel()
438
439 subject := readpref.PrimaryPreferred()
440
441 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
442
443 require.NoError(t, err)
444 require.Len(t, result, 1)
445 require.Equal(t, []Server{readPrefTestPrimary}, result)
446 }
447
448 func TestSelector_PrimaryPreferred_ignores_tags(t *testing.T) {
449 t.Parallel()
450
451 subject := readpref.PrimaryPreferred(
452 readpref.WithTags("a", "2"),
453 )
454
455 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
456
457 require.NoError(t, err)
458 require.Len(t, result, 1)
459 require.Equal(t, []Server{readPrefTestPrimary}, result)
460 }
461
462 func TestSelector_PrimaryPreferred_with_no_primary(t *testing.T) {
463 t.Parallel()
464
465 subject := readpref.PrimaryPreferred()
466
467 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
468
469 require.NoError(t, err)
470 require.Len(t, result, 2)
471 require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
472 }
473
474 func TestSelector_PrimaryPreferred_with_no_primary_and_tags(t *testing.T) {
475 t.Parallel()
476
477 subject := readpref.PrimaryPreferred(
478 readpref.WithTags("a", "2"),
479 )
480
481 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
482
483 require.NoError(t, err)
484 require.Len(t, result, 1)
485 require.Equal(t, []Server{readPrefTestSecondary2}, result)
486 }
487
488 func TestSelector_PrimaryPreferred_with_maxStaleness(t *testing.T) {
489 t.Parallel()
490
491 subject := readpref.PrimaryPreferred(
492 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
493 )
494
495 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
496
497 require.NoError(t, err)
498 require.Len(t, result, 1)
499 require.Equal(t, []Server{readPrefTestPrimary}, result)
500 }
501
502 func TestSelector_PrimaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
503 t.Parallel()
504
505 subject := readpref.PrimaryPreferred(
506 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
507 )
508
509 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
510
511 require.NoError(t, err)
512 require.Len(t, result, 1)
513 require.Equal(t, []Server{readPrefTestSecondary2}, result)
514 }
515
516 func TestSelector_SecondaryPreferred(t *testing.T) {
517 t.Parallel()
518
519 subject := readpref.SecondaryPreferred()
520
521 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
522
523 require.NoError(t, err)
524 require.Len(t, result, 2)
525 require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
526 }
527
528 func TestSelector_SecondaryPreferred_with_tags(t *testing.T) {
529 t.Parallel()
530
531 subject := readpref.SecondaryPreferred(
532 readpref.WithTags("a", "2"),
533 )
534
535 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
536
537 require.NoError(t, err)
538 require.Len(t, result, 1)
539 require.Equal(t, []Server{readPrefTestSecondary2}, result)
540 }
541
542 func TestSelector_SecondaryPreferred_with_tags_that_do_not_match(t *testing.T) {
543 t.Parallel()
544
545 subject := readpref.SecondaryPreferred(
546 readpref.WithTags("a", "3"),
547 )
548
549 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
550
551 require.NoError(t, err)
552 require.Len(t, result, 1)
553 require.Equal(t, []Server{readPrefTestPrimary}, result)
554 }
555
556 func TestSelector_SecondaryPreferred_with_tags_that_do_not_match_and_no_primary(t *testing.T) {
557 t.Parallel()
558
559 subject := readpref.SecondaryPreferred(
560 readpref.WithTags("a", "3"),
561 )
562
563 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
564
565 require.NoError(t, err)
566 require.Len(t, result, 0)
567 }
568
569 func TestSelector_SecondaryPreferred_with_no_secondaries(t *testing.T) {
570 t.Parallel()
571
572 subject := readpref.SecondaryPreferred()
573
574 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestPrimary})
575
576 require.NoError(t, err)
577 require.Len(t, result, 1)
578 require.Equal(t, []Server{readPrefTestPrimary}, result)
579 }
580
581 func TestSelector_SecondaryPreferred_with_no_secondaries_or_primary(t *testing.T) {
582 t.Parallel()
583
584 subject := readpref.SecondaryPreferred()
585
586 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{})
587
588 require.NoError(t, err)
589 require.Len(t, result, 0)
590 }
591
592 func TestSelector_SecondaryPreferred_with_maxStaleness(t *testing.T) {
593 t.Parallel()
594
595 subject := readpref.SecondaryPreferred(
596 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
597 )
598
599 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
600
601 require.NoError(t, err)
602 require.Len(t, result, 1)
603 require.Equal(t, []Server{readPrefTestSecondary2}, result)
604 }
605
606 func TestSelector_SecondaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
607 t.Parallel()
608
609 subject := readpref.SecondaryPreferred(
610 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
611 )
612
613 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
614
615 require.NoError(t, err)
616 require.Len(t, result, 1)
617 require.Equal(t, []Server{readPrefTestSecondary2}, result)
618 }
619
620 func TestSelector_Secondary(t *testing.T) {
621 t.Parallel()
622
623 subject := readpref.Secondary()
624
625 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
626
627 require.NoError(t, err)
628 require.Len(t, result, 2)
629 require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
630 }
631
632 func TestSelector_Secondary_with_tags(t *testing.T) {
633 t.Parallel()
634
635 subject := readpref.Secondary(
636 readpref.WithTags("a", "2"),
637 )
638
639 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
640
641 require.NoError(t, err)
642 require.Len(t, result, 1)
643 require.Equal(t, []Server{readPrefTestSecondary2}, result)
644 }
645
646 func TestSelector_Secondary_with_empty_tag_set(t *testing.T) {
647 t.Parallel()
648
649 primaryNoTags := Server{
650 Addr: address.Address("localhost:27017"),
651 Kind: RSPrimary,
652 WireVersion: &VersionRange{Min: 6, Max: 21},
653 }
654 firstSecondaryNoTags := Server{
655 Addr: address.Address("localhost:27018"),
656 Kind: RSSecondary,
657 WireVersion: &VersionRange{Min: 6, Max: 21},
658 }
659 secondSecondaryNoTags := Server{
660 Addr: address.Address("localhost:27019"),
661 Kind: RSSecondary,
662 WireVersion: &VersionRange{Min: 6, Max: 21},
663 }
664 topologyNoTags := Topology{
665 Kind: ReplicaSetWithPrimary,
666 Servers: []Server{primaryNoTags, firstSecondaryNoTags, secondSecondaryNoTags},
667 }
668
669 nonMatchingSet := tag.Set{
670 {Name: "foo", Value: "bar"},
671 }
672 emptyTagSet := tag.Set{}
673 rp := readpref.Secondary(
674 readpref.WithTagSets(nonMatchingSet, emptyTagSet),
675 )
676
677 result, err := ReadPrefSelector(rp).SelectServer(topologyNoTags, topologyNoTags.Servers)
678 assert.Nil(t, err, "SelectServer error: %v", err)
679 expectedResult := []Server{firstSecondaryNoTags, secondSecondaryNoTags}
680 assert.Equal(t, expectedResult, result, "expected result %v, got %v", expectedResult, result)
681 }
682
683 func TestSelector_Secondary_with_tags_that_do_not_match(t *testing.T) {
684 t.Parallel()
685
686 subject := readpref.Secondary(
687 readpref.WithTags("a", "3"),
688 )
689
690 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
691
692 require.NoError(t, err)
693 require.Len(t, result, 0)
694 }
695
696 func TestSelector_Secondary_with_no_secondaries(t *testing.T) {
697 t.Parallel()
698
699 subject := readpref.Secondary()
700
701 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestPrimary})
702
703 require.NoError(t, err)
704 require.Len(t, result, 0)
705 }
706
707 func TestSelector_Secondary_with_maxStaleness(t *testing.T) {
708 t.Parallel()
709
710 subject := readpref.Secondary(
711 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
712 )
713
714 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
715
716 require.NoError(t, err)
717 require.Len(t, result, 1)
718 require.Equal(t, []Server{readPrefTestSecondary2}, result)
719 }
720
721 func TestSelector_Secondary_with_maxStaleness_and_no_primary(t *testing.T) {
722 t.Parallel()
723
724 subject := readpref.Secondary(
725 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
726 )
727
728 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
729
730 require.NoError(t, err)
731 require.Len(t, result, 1)
732 require.Equal(t, []Server{readPrefTestSecondary2}, result)
733 }
734
735 func TestSelector_Nearest(t *testing.T) {
736 t.Parallel()
737
738 subject := readpref.Nearest()
739
740 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
741
742 require.NoError(t, err)
743 require.Len(t, result, 3)
744 require.Equal(t, []Server{readPrefTestPrimary, readPrefTestSecondary1, readPrefTestSecondary2}, result)
745 }
746
747 func TestSelector_Nearest_with_tags(t *testing.T) {
748 t.Parallel()
749
750 subject := readpref.Nearest(
751 readpref.WithTags("a", "1"),
752 )
753
754 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
755
756 require.NoError(t, err)
757 require.Len(t, result, 2)
758 require.Equal(t, []Server{readPrefTestPrimary, readPrefTestSecondary1}, result)
759 }
760
761 func TestSelector_Nearest_with_tags_that_do_not_match(t *testing.T) {
762 t.Parallel()
763
764 subject := readpref.Nearest(
765 readpref.WithTags("a", "3"),
766 )
767
768 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
769
770 require.NoError(t, err)
771 require.Len(t, result, 0)
772 }
773
774 func TestSelector_Nearest_with_no_primary(t *testing.T) {
775 t.Parallel()
776
777 subject := readpref.Nearest()
778
779 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
780
781 require.NoError(t, err)
782 require.Len(t, result, 2)
783 require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
784 }
785
786 func TestSelector_Nearest_with_no_secondaries(t *testing.T) {
787 t.Parallel()
788
789 subject := readpref.Nearest()
790
791 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestPrimary})
792
793 require.NoError(t, err)
794 require.Len(t, result, 1)
795 require.Equal(t, []Server{readPrefTestPrimary}, result)
796 }
797
798 func TestSelector_Nearest_with_maxStaleness(t *testing.T) {
799 t.Parallel()
800
801 subject := readpref.Nearest(
802 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
803 )
804
805 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
806
807 require.NoError(t, err)
808 require.Len(t, result, 2)
809 require.Equal(t, []Server{readPrefTestPrimary, readPrefTestSecondary2}, result)
810 }
811
812 func TestSelector_Nearest_with_maxStaleness_and_no_primary(t *testing.T) {
813 t.Parallel()
814
815 subject := readpref.Nearest(
816 readpref.WithMaxStaleness(time.Duration(90) * time.Second),
817 )
818
819 result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
820
821 require.NoError(t, err)
822 require.Len(t, result, 1)
823 require.Equal(t, []Server{readPrefTestSecondary2}, result)
824 }
825
826 func TestSelector_Max_staleness_is_less_than_90_seconds(t *testing.T) {
827 t.Parallel()
828
829 subject := readpref.Nearest(
830 readpref.WithMaxStaleness(time.Duration(50) * time.Second),
831 )
832
833 s := Server{
834 Addr: address.Address("localhost:27017"),
835 HeartbeatInterval: time.Duration(10) * time.Second,
836 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
837 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
838 Kind: RSPrimary,
839 WireVersion: &VersionRange{Min: 6, Max: 21},
840 }
841 c := Topology{
842 Kind: ReplicaSetWithPrimary,
843 Servers: []Server{s},
844 }
845
846 _, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
847
848 require.Error(t, err)
849 }
850
851 func TestSelector_Max_staleness_is_too_low(t *testing.T) {
852 t.Parallel()
853
854 subject := readpref.Nearest(
855 readpref.WithMaxStaleness(time.Duration(100) * time.Second),
856 )
857
858 s := Server{
859 Addr: address.Address("localhost:27017"),
860 HeartbeatInterval: time.Duration(100) * time.Second,
861 LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
862 LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
863 Kind: RSPrimary,
864 WireVersion: &VersionRange{Min: 6, Max: 21},
865 }
866 c := Topology{
867 Kind: ReplicaSetWithPrimary,
868 Servers: []Server{s},
869 }
870
871 _, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
872
873 require.Error(t, err)
874 }
875
View as plain text