...

Source file src/go.mongodb.org/mongo-driver/mongo/description/selector_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/description

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package description
     8  
     9  import (
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/google/go-cmp/cmp"
    14  	"go.mongodb.org/mongo-driver/internal/assert"
    15  	"go.mongodb.org/mongo-driver/internal/require"
    16  	"go.mongodb.org/mongo-driver/mongo/address"
    17  	"go.mongodb.org/mongo-driver/mongo/readpref"
    18  	"go.mongodb.org/mongo-driver/tag"
    19  )
    20  
    21  func TestServerSelection(t *testing.T) {
    22  	noerr := func(t *testing.T, err error) {
    23  		if err != nil {
    24  			t.Errorf("Unepexted error: %v", err)
    25  			t.FailNow()
    26  		}
    27  	}
    28  
    29  	t.Run("WriteSelector", func(t *testing.T) {
    30  		testCases := []struct {
    31  			name  string
    32  			desc  Topology
    33  			start int
    34  			end   int
    35  		}{
    36  			{
    37  				name: "ReplicaSetWithPrimary",
    38  				desc: Topology{
    39  					Kind: ReplicaSetWithPrimary,
    40  					Servers: []Server{
    41  						{Addr: address.Address("localhost:27017"), Kind: RSPrimary},
    42  						{Addr: address.Address("localhost:27018"), Kind: RSSecondary},
    43  						{Addr: address.Address("localhost:27019"), Kind: RSSecondary},
    44  					},
    45  				},
    46  				start: 0,
    47  				end:   1,
    48  			},
    49  			{
    50  				name: "ReplicaSetNoPrimary",
    51  				desc: Topology{
    52  					Kind: ReplicaSetNoPrimary,
    53  					Servers: []Server{
    54  						{Addr: address.Address("localhost:27018"), Kind: RSSecondary},
    55  						{Addr: address.Address("localhost:27019"), Kind: RSSecondary},
    56  					},
    57  				},
    58  				start: 0,
    59  				end:   0,
    60  			},
    61  			{
    62  				name: "Sharded",
    63  				desc: Topology{
    64  					Kind: Sharded,
    65  					Servers: []Server{
    66  						{Addr: address.Address("localhost:27018"), Kind: Mongos},
    67  						{Addr: address.Address("localhost:27019"), Kind: Mongos},
    68  					},
    69  				},
    70  				start: 0,
    71  				end:   2,
    72  			},
    73  			{
    74  				name: "Single",
    75  				desc: Topology{
    76  					Kind: Single,
    77  					Servers: []Server{
    78  						{Addr: address.Address("localhost:27018"), Kind: Standalone},
    79  					},
    80  				},
    81  				start: 0,
    82  				end:   1,
    83  			},
    84  		}
    85  
    86  		for _, tc := range testCases {
    87  			t.Run(tc.name, func(t *testing.T) {
    88  				result, err := WriteSelector().SelectServer(tc.desc, tc.desc.Servers)
    89  				noerr(t, err)
    90  				if len(result) != tc.end-tc.start {
    91  					t.Errorf("Incorrect number of servers selected. got %d; want %d", len(result), tc.end-tc.start)
    92  				}
    93  				if diff := cmp.Diff(result, tc.desc.Servers[tc.start:tc.end]); diff != "" {
    94  					t.Errorf("Incorrect servers selected (-got +want):\n%s", diff)
    95  				}
    96  			})
    97  		}
    98  	})
    99  	t.Run("LatencySelector", func(t *testing.T) {
   100  		testCases := []struct {
   101  			name  string
   102  			desc  Topology
   103  			start int
   104  			end   int
   105  		}{
   106  			{
   107  				name: "NoRTTSet",
   108  				desc: Topology{
   109  					Servers: []Server{
   110  						{Addr: address.Address("localhost:27017")},
   111  						{Addr: address.Address("localhost:27018")},
   112  						{Addr: address.Address("localhost:27019")},
   113  					},
   114  				},
   115  				start: 0,
   116  				end:   3,
   117  			},
   118  			{
   119  				name: "MultipleServers PartialNoRTTSet",
   120  				desc: Topology{
   121  					Servers: []Server{
   122  						{Addr: address.Address("localhost:27017"), AverageRTT: 5 * time.Second, AverageRTTSet: true},
   123  						{Addr: address.Address("localhost:27018"), AverageRTT: 10 * time.Second, AverageRTTSet: true},
   124  						{Addr: address.Address("localhost:27019")},
   125  					},
   126  				},
   127  				start: 0,
   128  				end:   2,
   129  			},
   130  			{
   131  				name: "MultipleServers",
   132  				desc: Topology{
   133  					Servers: []Server{
   134  						{Addr: address.Address("localhost:27017"), AverageRTT: 5 * time.Second, AverageRTTSet: true},
   135  						{Addr: address.Address("localhost:27018"), AverageRTT: 10 * time.Second, AverageRTTSet: true},
   136  						{Addr: address.Address("localhost:27019"), AverageRTT: 26 * time.Second, AverageRTTSet: true},
   137  					},
   138  				},
   139  				start: 0,
   140  				end:   2,
   141  			},
   142  			{
   143  				name:  "No Servers",
   144  				desc:  Topology{Servers: []Server{}},
   145  				start: 0,
   146  				end:   0,
   147  			},
   148  			{
   149  				name: "1 Server",
   150  				desc: Topology{
   151  					Servers: []Server{
   152  						{Addr: address.Address("localhost:27017"), AverageRTT: 26 * time.Second, AverageRTTSet: true},
   153  					},
   154  				},
   155  				start: 0,
   156  				end:   1,
   157  			},
   158  		}
   159  
   160  		for _, tc := range testCases {
   161  			t.Run(tc.name, func(t *testing.T) {
   162  				result, err := LatencySelector(20*time.Second).SelectServer(tc.desc, tc.desc.Servers)
   163  				noerr(t, err)
   164  				if len(result) != tc.end-tc.start {
   165  					t.Errorf("Incorrect number of servers selected. got %d; want %d", len(result), tc.end-tc.start)
   166  				}
   167  				if diff := cmp.Diff(result, tc.desc.Servers[tc.start:tc.end]); diff != "" {
   168  					t.Errorf("Incorrect servers selected (-got +want):\n%s", diff)
   169  				}
   170  			})
   171  		}
   172  	})
   173  }
   174  
   175  var readPrefTestPrimary = Server{
   176  	Addr:              address.Address("localhost:27017"),
   177  	HeartbeatInterval: time.Duration(10) * time.Second,
   178  	LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   179  	LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   180  	Kind:              RSPrimary,
   181  	Tags:              tag.Set{tag.Tag{Name: "a", Value: "1"}},
   182  	WireVersion:       &VersionRange{Min: 6, Max: 21},
   183  }
   184  var readPrefTestSecondary1 = Server{
   185  	Addr:              address.Address("localhost:27018"),
   186  	HeartbeatInterval: time.Duration(10) * time.Second,
   187  	LastWriteTime:     time.Date(2017, 2, 11, 13, 58, 0, 0, time.UTC),
   188  	LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   189  	Kind:              RSSecondary,
   190  	Tags:              tag.Set{tag.Tag{Name: "a", Value: "1"}},
   191  	WireVersion:       &VersionRange{Min: 6, Max: 21},
   192  }
   193  var readPrefTestSecondary2 = Server{
   194  	Addr:              address.Address("localhost:27018"),
   195  	HeartbeatInterval: time.Duration(10) * time.Second,
   196  	LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   197  	LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   198  	Kind:              RSSecondary,
   199  	Tags:              tag.Set{tag.Tag{Name: "a", Value: "2"}},
   200  	WireVersion:       &VersionRange{Min: 6, Max: 21},
   201  }
   202  var readPrefTestTopology = Topology{
   203  	Kind:    ReplicaSetWithPrimary,
   204  	Servers: []Server{readPrefTestPrimary, readPrefTestSecondary1, readPrefTestSecondary2},
   205  }
   206  
   207  func TestSelector_Sharded(t *testing.T) {
   208  	t.Parallel()
   209  
   210  	subject := readpref.Primary()
   211  
   212  	s := Server{
   213  		Addr:              address.Address("localhost:27017"),
   214  		HeartbeatInterval: time.Duration(10) * time.Second,
   215  		LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   216  		LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   217  		Kind:              Mongos,
   218  		WireVersion:       &VersionRange{Min: 6, Max: 21},
   219  	}
   220  	c := Topology{
   221  		Kind:    Sharded,
   222  		Servers: []Server{s},
   223  	}
   224  
   225  	result, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
   226  
   227  	require.NoError(t, err)
   228  	require.Len(t, result, 1)
   229  	require.Equal(t, []Server{s}, result)
   230  }
   231  
   232  func BenchmarkLatencySelector(b *testing.B) {
   233  	for _, bcase := range []struct {
   234  		name        string
   235  		serversHook func(servers []Server)
   236  	}{
   237  		{
   238  			name:        "AllFit",
   239  			serversHook: func(servers []Server) {},
   240  		},
   241  		{
   242  			name: "AllButOneFit",
   243  			serversHook: func(servers []Server) {
   244  				servers[0].AverageRTT = 2 * time.Second
   245  			},
   246  		},
   247  		{
   248  			name: "HalfFit",
   249  			serversHook: func(servers []Server) {
   250  				for i := 0; i < len(servers); i += 2 {
   251  					servers[i].AverageRTT = 2 * time.Second
   252  				}
   253  			},
   254  		},
   255  		{
   256  			name: "OneFit",
   257  			serversHook: func(servers []Server) {
   258  				for i := 1; i < len(servers); i++ {
   259  					servers[i].AverageRTT = 2 * time.Second
   260  				}
   261  			},
   262  		},
   263  	} {
   264  		bcase := bcase
   265  
   266  		b.Run(bcase.name, func(b *testing.B) {
   267  			s := Server{
   268  				Addr:              address.Address("localhost:27017"),
   269  				HeartbeatInterval: time.Duration(10) * time.Second,
   270  				LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   271  				LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   272  				Kind:              Mongos,
   273  				WireVersion:       &VersionRange{Min: 6, Max: 21},
   274  				AverageRTTSet:     true,
   275  				AverageRTT:        time.Second,
   276  			}
   277  			servers := make([]Server, 100)
   278  			for i := 0; i < len(servers); i++ {
   279  				servers[i] = s
   280  			}
   281  			bcase.serversHook(servers)
   282  			//this will make base 1 sec latency < min (0.5) + conf (1)
   283  			//and high latency 2 higher than the threshold
   284  			servers[99].AverageRTT = 500 * time.Millisecond
   285  			c := Topology{
   286  				Kind:    Sharded,
   287  				Servers: servers,
   288  			}
   289  
   290  			b.ResetTimer()
   291  			b.RunParallel(func(p *testing.PB) {
   292  				b.ReportAllocs()
   293  				for p.Next() {
   294  					_, _ = LatencySelector(time.Second).SelectServer(c, c.Servers)
   295  				}
   296  			})
   297  		})
   298  	}
   299  }
   300  
   301  func BenchmarkSelector_Sharded(b *testing.B) {
   302  	for _, bcase := range []struct {
   303  		name        string
   304  		serversHook func(servers []Server)
   305  	}{
   306  		{
   307  			name:        "AllFit",
   308  			serversHook: func(servers []Server) {},
   309  		},
   310  		{
   311  			name: "AllButOneFit",
   312  			serversHook: func(servers []Server) {
   313  				servers[0].Kind = LoadBalancer
   314  			},
   315  		},
   316  		{
   317  			name: "HalfFit",
   318  			serversHook: func(servers []Server) {
   319  				for i := 0; i < len(servers); i += 2 {
   320  					servers[i].Kind = LoadBalancer
   321  				}
   322  			},
   323  		},
   324  		{
   325  			name: "OneFit",
   326  			serversHook: func(servers []Server) {
   327  				for i := 1; i < len(servers); i++ {
   328  					servers[i].Kind = LoadBalancer
   329  				}
   330  			},
   331  		},
   332  	} {
   333  		bcase := bcase
   334  
   335  		b.Run(bcase.name, func(b *testing.B) {
   336  			subject := readpref.Primary()
   337  
   338  			s := Server{
   339  				Addr:              address.Address("localhost:27017"),
   340  				HeartbeatInterval: time.Duration(10) * time.Second,
   341  				LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   342  				LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   343  				Kind:              Mongos,
   344  				WireVersion:       &VersionRange{Min: 6, Max: 21},
   345  			}
   346  			servers := make([]Server, 100)
   347  			for i := 0; i < len(servers); i++ {
   348  				servers[i] = s
   349  			}
   350  			bcase.serversHook(servers)
   351  			c := Topology{
   352  				Kind:    Sharded,
   353  				Servers: servers,
   354  			}
   355  
   356  			b.ResetTimer()
   357  			b.RunParallel(func(p *testing.PB) {
   358  				b.ReportAllocs()
   359  				for p.Next() {
   360  					_, _ = ReadPrefSelector(subject).SelectServer(c, c.Servers)
   361  				}
   362  			})
   363  		})
   364  	}
   365  }
   366  
   367  func Benchmark_SelectServer_SelectServer(b *testing.B) {
   368  	topology := Topology{Kind: ReplicaSet} // You can change the topology as needed
   369  	candidates := []Server{
   370  		{Kind: Mongos},
   371  		{Kind: RSPrimary},
   372  		{Kind: Standalone},
   373  	}
   374  
   375  	selector := writeServerSelector{} // Assuming this is the receiver type
   376  
   377  	b.ReportAllocs()
   378  	b.ResetTimer()
   379  
   380  	for i := 0; i < b.N; i++ {
   381  		_, err := selector.SelectServer(topology, candidates)
   382  		if err != nil {
   383  			b.Fatalf("Error selecting server: %v", err)
   384  		}
   385  	}
   386  }
   387  
   388  func TestSelector_Single(t *testing.T) {
   389  	t.Parallel()
   390  
   391  	subject := readpref.Primary()
   392  
   393  	s := Server{
   394  		Addr:              address.Address("localhost:27017"),
   395  		HeartbeatInterval: time.Duration(10) * time.Second,
   396  		LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   397  		LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   398  		Kind:              Mongos,
   399  		WireVersion:       &VersionRange{Min: 6, Max: 21},
   400  	}
   401  	c := Topology{
   402  		Kind:    Single,
   403  		Servers: []Server{s},
   404  	}
   405  
   406  	result, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
   407  
   408  	require.NoError(t, err)
   409  	require.Len(t, result, 1)
   410  	require.Equal(t, []Server{s}, result)
   411  }
   412  
   413  func TestSelector_Primary(t *testing.T) {
   414  	t.Parallel()
   415  
   416  	subject := readpref.Primary()
   417  
   418  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   419  
   420  	require.NoError(t, err)
   421  	require.Len(t, result, 1)
   422  	require.Equal(t, []Server{readPrefTestPrimary}, result)
   423  }
   424  
   425  func TestSelector_Primary_with_no_primary(t *testing.T) {
   426  	t.Parallel()
   427  
   428  	subject := readpref.Primary()
   429  
   430  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   431  
   432  	require.NoError(t, err)
   433  	require.Len(t, result, 0)
   434  }
   435  
   436  func TestSelector_PrimaryPreferred(t *testing.T) {
   437  	t.Parallel()
   438  
   439  	subject := readpref.PrimaryPreferred()
   440  
   441  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   442  
   443  	require.NoError(t, err)
   444  	require.Len(t, result, 1)
   445  	require.Equal(t, []Server{readPrefTestPrimary}, result)
   446  }
   447  
   448  func TestSelector_PrimaryPreferred_ignores_tags(t *testing.T) {
   449  	t.Parallel()
   450  
   451  	subject := readpref.PrimaryPreferred(
   452  		readpref.WithTags("a", "2"),
   453  	)
   454  
   455  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   456  
   457  	require.NoError(t, err)
   458  	require.Len(t, result, 1)
   459  	require.Equal(t, []Server{readPrefTestPrimary}, result)
   460  }
   461  
   462  func TestSelector_PrimaryPreferred_with_no_primary(t *testing.T) {
   463  	t.Parallel()
   464  
   465  	subject := readpref.PrimaryPreferred()
   466  
   467  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   468  
   469  	require.NoError(t, err)
   470  	require.Len(t, result, 2)
   471  	require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
   472  }
   473  
   474  func TestSelector_PrimaryPreferred_with_no_primary_and_tags(t *testing.T) {
   475  	t.Parallel()
   476  
   477  	subject := readpref.PrimaryPreferred(
   478  		readpref.WithTags("a", "2"),
   479  	)
   480  
   481  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   482  
   483  	require.NoError(t, err)
   484  	require.Len(t, result, 1)
   485  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   486  }
   487  
   488  func TestSelector_PrimaryPreferred_with_maxStaleness(t *testing.T) {
   489  	t.Parallel()
   490  
   491  	subject := readpref.PrimaryPreferred(
   492  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   493  	)
   494  
   495  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   496  
   497  	require.NoError(t, err)
   498  	require.Len(t, result, 1)
   499  	require.Equal(t, []Server{readPrefTestPrimary}, result)
   500  }
   501  
   502  func TestSelector_PrimaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
   503  	t.Parallel()
   504  
   505  	subject := readpref.PrimaryPreferred(
   506  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   507  	)
   508  
   509  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   510  
   511  	require.NoError(t, err)
   512  	require.Len(t, result, 1)
   513  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   514  }
   515  
   516  func TestSelector_SecondaryPreferred(t *testing.T) {
   517  	t.Parallel()
   518  
   519  	subject := readpref.SecondaryPreferred()
   520  
   521  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   522  
   523  	require.NoError(t, err)
   524  	require.Len(t, result, 2)
   525  	require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
   526  }
   527  
   528  func TestSelector_SecondaryPreferred_with_tags(t *testing.T) {
   529  	t.Parallel()
   530  
   531  	subject := readpref.SecondaryPreferred(
   532  		readpref.WithTags("a", "2"),
   533  	)
   534  
   535  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   536  
   537  	require.NoError(t, err)
   538  	require.Len(t, result, 1)
   539  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   540  }
   541  
   542  func TestSelector_SecondaryPreferred_with_tags_that_do_not_match(t *testing.T) {
   543  	t.Parallel()
   544  
   545  	subject := readpref.SecondaryPreferred(
   546  		readpref.WithTags("a", "3"),
   547  	)
   548  
   549  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   550  
   551  	require.NoError(t, err)
   552  	require.Len(t, result, 1)
   553  	require.Equal(t, []Server{readPrefTestPrimary}, result)
   554  }
   555  
   556  func TestSelector_SecondaryPreferred_with_tags_that_do_not_match_and_no_primary(t *testing.T) {
   557  	t.Parallel()
   558  
   559  	subject := readpref.SecondaryPreferred(
   560  		readpref.WithTags("a", "3"),
   561  	)
   562  
   563  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   564  
   565  	require.NoError(t, err)
   566  	require.Len(t, result, 0)
   567  }
   568  
   569  func TestSelector_SecondaryPreferred_with_no_secondaries(t *testing.T) {
   570  	t.Parallel()
   571  
   572  	subject := readpref.SecondaryPreferred()
   573  
   574  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestPrimary})
   575  
   576  	require.NoError(t, err)
   577  	require.Len(t, result, 1)
   578  	require.Equal(t, []Server{readPrefTestPrimary}, result)
   579  }
   580  
   581  func TestSelector_SecondaryPreferred_with_no_secondaries_or_primary(t *testing.T) {
   582  	t.Parallel()
   583  
   584  	subject := readpref.SecondaryPreferred()
   585  
   586  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{})
   587  
   588  	require.NoError(t, err)
   589  	require.Len(t, result, 0)
   590  }
   591  
   592  func TestSelector_SecondaryPreferred_with_maxStaleness(t *testing.T) {
   593  	t.Parallel()
   594  
   595  	subject := readpref.SecondaryPreferred(
   596  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   597  	)
   598  
   599  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   600  
   601  	require.NoError(t, err)
   602  	require.Len(t, result, 1)
   603  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   604  }
   605  
   606  func TestSelector_SecondaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
   607  	t.Parallel()
   608  
   609  	subject := readpref.SecondaryPreferred(
   610  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   611  	)
   612  
   613  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   614  
   615  	require.NoError(t, err)
   616  	require.Len(t, result, 1)
   617  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   618  }
   619  
   620  func TestSelector_Secondary(t *testing.T) {
   621  	t.Parallel()
   622  
   623  	subject := readpref.Secondary()
   624  
   625  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   626  
   627  	require.NoError(t, err)
   628  	require.Len(t, result, 2)
   629  	require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
   630  }
   631  
   632  func TestSelector_Secondary_with_tags(t *testing.T) {
   633  	t.Parallel()
   634  
   635  	subject := readpref.Secondary(
   636  		readpref.WithTags("a", "2"),
   637  	)
   638  
   639  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   640  
   641  	require.NoError(t, err)
   642  	require.Len(t, result, 1)
   643  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   644  }
   645  
   646  func TestSelector_Secondary_with_empty_tag_set(t *testing.T) {
   647  	t.Parallel()
   648  
   649  	primaryNoTags := Server{
   650  		Addr:        address.Address("localhost:27017"),
   651  		Kind:        RSPrimary,
   652  		WireVersion: &VersionRange{Min: 6, Max: 21},
   653  	}
   654  	firstSecondaryNoTags := Server{
   655  		Addr:        address.Address("localhost:27018"),
   656  		Kind:        RSSecondary,
   657  		WireVersion: &VersionRange{Min: 6, Max: 21},
   658  	}
   659  	secondSecondaryNoTags := Server{
   660  		Addr:        address.Address("localhost:27019"),
   661  		Kind:        RSSecondary,
   662  		WireVersion: &VersionRange{Min: 6, Max: 21},
   663  	}
   664  	topologyNoTags := Topology{
   665  		Kind:    ReplicaSetWithPrimary,
   666  		Servers: []Server{primaryNoTags, firstSecondaryNoTags, secondSecondaryNoTags},
   667  	}
   668  
   669  	nonMatchingSet := tag.Set{
   670  		{Name: "foo", Value: "bar"},
   671  	}
   672  	emptyTagSet := tag.Set{}
   673  	rp := readpref.Secondary(
   674  		readpref.WithTagSets(nonMatchingSet, emptyTagSet),
   675  	)
   676  
   677  	result, err := ReadPrefSelector(rp).SelectServer(topologyNoTags, topologyNoTags.Servers)
   678  	assert.Nil(t, err, "SelectServer error: %v", err)
   679  	expectedResult := []Server{firstSecondaryNoTags, secondSecondaryNoTags}
   680  	assert.Equal(t, expectedResult, result, "expected result %v, got %v", expectedResult, result)
   681  }
   682  
   683  func TestSelector_Secondary_with_tags_that_do_not_match(t *testing.T) {
   684  	t.Parallel()
   685  
   686  	subject := readpref.Secondary(
   687  		readpref.WithTags("a", "3"),
   688  	)
   689  
   690  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   691  
   692  	require.NoError(t, err)
   693  	require.Len(t, result, 0)
   694  }
   695  
   696  func TestSelector_Secondary_with_no_secondaries(t *testing.T) {
   697  	t.Parallel()
   698  
   699  	subject := readpref.Secondary()
   700  
   701  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestPrimary})
   702  
   703  	require.NoError(t, err)
   704  	require.Len(t, result, 0)
   705  }
   706  
   707  func TestSelector_Secondary_with_maxStaleness(t *testing.T) {
   708  	t.Parallel()
   709  
   710  	subject := readpref.Secondary(
   711  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   712  	)
   713  
   714  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   715  
   716  	require.NoError(t, err)
   717  	require.Len(t, result, 1)
   718  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   719  }
   720  
   721  func TestSelector_Secondary_with_maxStaleness_and_no_primary(t *testing.T) {
   722  	t.Parallel()
   723  
   724  	subject := readpref.Secondary(
   725  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   726  	)
   727  
   728  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   729  
   730  	require.NoError(t, err)
   731  	require.Len(t, result, 1)
   732  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   733  }
   734  
   735  func TestSelector_Nearest(t *testing.T) {
   736  	t.Parallel()
   737  
   738  	subject := readpref.Nearest()
   739  
   740  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   741  
   742  	require.NoError(t, err)
   743  	require.Len(t, result, 3)
   744  	require.Equal(t, []Server{readPrefTestPrimary, readPrefTestSecondary1, readPrefTestSecondary2}, result)
   745  }
   746  
   747  func TestSelector_Nearest_with_tags(t *testing.T) {
   748  	t.Parallel()
   749  
   750  	subject := readpref.Nearest(
   751  		readpref.WithTags("a", "1"),
   752  	)
   753  
   754  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   755  
   756  	require.NoError(t, err)
   757  	require.Len(t, result, 2)
   758  	require.Equal(t, []Server{readPrefTestPrimary, readPrefTestSecondary1}, result)
   759  }
   760  
   761  func TestSelector_Nearest_with_tags_that_do_not_match(t *testing.T) {
   762  	t.Parallel()
   763  
   764  	subject := readpref.Nearest(
   765  		readpref.WithTags("a", "3"),
   766  	)
   767  
   768  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   769  
   770  	require.NoError(t, err)
   771  	require.Len(t, result, 0)
   772  }
   773  
   774  func TestSelector_Nearest_with_no_primary(t *testing.T) {
   775  	t.Parallel()
   776  
   777  	subject := readpref.Nearest()
   778  
   779  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   780  
   781  	require.NoError(t, err)
   782  	require.Len(t, result, 2)
   783  	require.Equal(t, []Server{readPrefTestSecondary1, readPrefTestSecondary2}, result)
   784  }
   785  
   786  func TestSelector_Nearest_with_no_secondaries(t *testing.T) {
   787  	t.Parallel()
   788  
   789  	subject := readpref.Nearest()
   790  
   791  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestPrimary})
   792  
   793  	require.NoError(t, err)
   794  	require.Len(t, result, 1)
   795  	require.Equal(t, []Server{readPrefTestPrimary}, result)
   796  }
   797  
   798  func TestSelector_Nearest_with_maxStaleness(t *testing.T) {
   799  	t.Parallel()
   800  
   801  	subject := readpref.Nearest(
   802  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   803  	)
   804  
   805  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
   806  
   807  	require.NoError(t, err)
   808  	require.Len(t, result, 2)
   809  	require.Equal(t, []Server{readPrefTestPrimary, readPrefTestSecondary2}, result)
   810  }
   811  
   812  func TestSelector_Nearest_with_maxStaleness_and_no_primary(t *testing.T) {
   813  	t.Parallel()
   814  
   815  	subject := readpref.Nearest(
   816  		readpref.WithMaxStaleness(time.Duration(90) * time.Second),
   817  	)
   818  
   819  	result, err := ReadPrefSelector(subject).SelectServer(readPrefTestTopology, []Server{readPrefTestSecondary1, readPrefTestSecondary2})
   820  
   821  	require.NoError(t, err)
   822  	require.Len(t, result, 1)
   823  	require.Equal(t, []Server{readPrefTestSecondary2}, result)
   824  }
   825  
   826  func TestSelector_Max_staleness_is_less_than_90_seconds(t *testing.T) {
   827  	t.Parallel()
   828  
   829  	subject := readpref.Nearest(
   830  		readpref.WithMaxStaleness(time.Duration(50) * time.Second),
   831  	)
   832  
   833  	s := Server{
   834  		Addr:              address.Address("localhost:27017"),
   835  		HeartbeatInterval: time.Duration(10) * time.Second,
   836  		LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   837  		LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   838  		Kind:              RSPrimary,
   839  		WireVersion:       &VersionRange{Min: 6, Max: 21},
   840  	}
   841  	c := Topology{
   842  		Kind:    ReplicaSetWithPrimary,
   843  		Servers: []Server{s},
   844  	}
   845  
   846  	_, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
   847  
   848  	require.Error(t, err)
   849  }
   850  
   851  func TestSelector_Max_staleness_is_too_low(t *testing.T) {
   852  	t.Parallel()
   853  
   854  	subject := readpref.Nearest(
   855  		readpref.WithMaxStaleness(time.Duration(100) * time.Second),
   856  	)
   857  
   858  	s := Server{
   859  		Addr:              address.Address("localhost:27017"),
   860  		HeartbeatInterval: time.Duration(100) * time.Second,
   861  		LastWriteTime:     time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
   862  		LastUpdateTime:    time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
   863  		Kind:              RSPrimary,
   864  		WireVersion:       &VersionRange{Min: 6, Max: 21},
   865  	}
   866  	c := Topology{
   867  		Kind:    ReplicaSetWithPrimary,
   868  		Servers: []Server{s},
   869  	}
   870  
   871  	_, err := ReadPrefSelector(subject).SelectServer(c, c.Servers)
   872  
   873  	require.Error(t, err)
   874  }
   875  

View as plain text