...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/sdam_spec_test.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/topology

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package topology
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"io/ioutil"
    13  	"net"
    14  	"path"
    15  	"sync"
    16  	"testing"
    17  	"time"
    18  
    19  	"go.mongodb.org/mongo-driver/bson"
    20  	"go.mongodb.org/mongo-driver/bson/primitive"
    21  	"go.mongodb.org/mongo-driver/event"
    22  	"go.mongodb.org/mongo-driver/internal/assert"
    23  	"go.mongodb.org/mongo-driver/internal/spectest"
    24  	"go.mongodb.org/mongo-driver/mongo/address"
    25  	"go.mongodb.org/mongo-driver/mongo/description"
    26  	"go.mongodb.org/mongo-driver/mongo/options"
    27  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    28  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    29  )
    30  
    31  type response struct {
    32  	Host  string
    33  	Hello Hello
    34  }
    35  
    36  type Hello struct {
    37  	Arbiters                     []string           `bson:"arbiters,omitempty"`
    38  	ArbiterOnly                  bool               `bson:"arbiterOnly,omitempty"`
    39  	ClusterTime                  bson.Raw           `bson:"$clusterTime,omitempty"`
    40  	Compression                  []string           `bson:"compression,omitempty"`
    41  	ElectionID                   primitive.ObjectID `bson:"electionId,omitempty"`
    42  	Hidden                       bool               `bson:"hidden,omitempty"`
    43  	Hosts                        []string           `bson:"hosts,omitempty"`
    44  	HelloOK                      bool               `bson:"helloOk,omitempty"`
    45  	IsWritablePrimary            bool               `bson:"isWritablePrimary,omitempty"`
    46  	IsReplicaSet                 bool               `bson:"isreplicaset,omitempty"`
    47  	LastWrite                    *lastWriteDate     `bson:"lastWrite,omitempty"`
    48  	LogicalSessionTimeoutMinutes uint32             `bson:"logicalSessionTimeoutMinutes,omitempty"`
    49  	MaxBSONObjectSize            uint32             `bson:"maxBsonObjectSize,omitempty"`
    50  	MaxMessageSizeBytes          uint32             `bson:"maxMessageSizeBytes,omitempty"`
    51  	MaxWriteBatchSize            uint32             `bson:"maxWriteBatchSize,omitempty"`
    52  	Me                           string             `bson:"me,omitempty"`
    53  	MaxWireVersion               int32              `bson:"maxWireVersion,omitempty"`
    54  	MinWireVersion               int32              `bson:"minWireVersion,omitempty"`
    55  	Msg                          string             `bson:"msg,omitempty"`
    56  	OK                           int32              `bson:"ok"`
    57  	Passives                     []string           `bson:"passives,omitempty"`
    58  	Primary                      string             `bson:"primary,omitempty"`
    59  	ReadOnly                     bool               `bson:"readOnly,omitempty"`
    60  	SaslSupportedMechs           []string           `bson:"saslSupportedMechs,omitempty"`
    61  	Secondary                    bool               `bson:"secondary,omitempty"`
    62  	SetName                      string             `bson:"setName,omitempty"`
    63  	SetVersion                   uint32             `bson:"setVersion,omitempty"`
    64  	Tags                         map[string]string  `bson:"tags,omitempty"`
    65  	TopologyVersion              *topologyVersion   `bson:"topologyVersion,omitempty"`
    66  }
    67  
    68  type lastWriteDate struct {
    69  	LastWriteDate time.Time `bson:"lastWriteDate"`
    70  }
    71  
    72  type server struct {
    73  	Type            string
    74  	SetName         string
    75  	SetVersion      uint32
    76  	ElectionID      *primitive.ObjectID `bson:"electionId"`
    77  	MinWireVersion  *int32
    78  	MaxWireVersion  *int32
    79  	TopologyVersion *topologyVersion
    80  	Pool            *testPool
    81  }
    82  
    83  type topologyVersion struct {
    84  	ProcessID primitive.ObjectID `bson:"processId"`
    85  	Counter   int64
    86  }
    87  
    88  type testPool struct {
    89  	Generation uint64
    90  }
    91  
    92  type applicationError struct {
    93  	Address        string
    94  	Generation     *uint64
    95  	MaxWireVersion *int32
    96  	When           string
    97  	Type           string
    98  	Response       bsoncore.Document
    99  }
   100  
   101  type topologyDescription struct {
   102  	TopologyType string              `bson:"topologyType"`
   103  	Servers      []serverDescription `bson:"servers"`
   104  	SetName      string              `bson:"setName,omitempty"`
   105  }
   106  
   107  type serverDescription struct {
   108  	Address  string   `bson:"address"`
   109  	Arbiters []string `bson:"arbiters"`
   110  	Hosts    []string `bson:"hosts"`
   111  	Passives []string `bson:"passives"`
   112  	Primary  string   `bson:"primary,omitempty"`
   113  	SetName  string   `bson:"setName,omitempty"`
   114  	Type     string   `bson:"type"`
   115  }
   116  
   117  type topologyOpeningEvent struct {
   118  	TopologyID string `bson:"topologyId"`
   119  }
   120  
   121  type serverOpeningEvent struct {
   122  	Address    string `bson:"address"`
   123  	TopologyID string `bson:"topologyId"`
   124  }
   125  
   126  type topologyDescriptionChangedEvent struct {
   127  	TopologyID          string              `bson:"topologyId"`
   128  	PreviousDescription topologyDescription `bson:"previousDescription"`
   129  	NewDescription      topologyDescription `bson:"newDescription"`
   130  }
   131  
   132  type serverDescriptionChangedEvent struct {
   133  	Address             string            `bson:"address"`
   134  	TopologyID          string            `bson:"topologyId"`
   135  	PreviousDescription serverDescription `bson:"previousDescription"`
   136  	NewDescription      serverDescription `bson:"newDescription"`
   137  }
   138  
   139  type serverClosedEvent struct {
   140  	Address    string `bson:"address"`
   141  	TopologyID string `bson:"topologyId"`
   142  }
   143  
   144  type monitoringEvent struct {
   145  	TopologyOpeningEvent            *topologyOpeningEvent            `bson:"topology_opening_event,omitempty"`
   146  	ServerOpeningEvent              *serverOpeningEvent              `bson:"server_opening_event,omitempty"`
   147  	TopologyDescriptionChangedEvent *topologyDescriptionChangedEvent `bson:"topology_description_changed_event,omitempty"`
   148  	ServerDescriptionChangedEvent   *serverDescriptionChangedEvent   `bson:"server_description_changed_event,omitempty"`
   149  	ServerClosedEvent               *serverClosedEvent               `bson:"server_closed_event,omitempty"`
   150  }
   151  
   152  type outcome struct {
   153  	Servers                      map[string]server
   154  	TopologyType                 string
   155  	SetName                      string
   156  	LogicalSessionTimeoutMinutes *int64
   157  	MaxSetVersion                uint32
   158  	MaxElectionID                primitive.ObjectID `bson:"maxElectionId"`
   159  	Compatible                   *bool
   160  	Events                       []monitoringEvent
   161  }
   162  
   163  type phase struct {
   164  	Description       string
   165  	Responses         []response
   166  	ApplicationErrors []applicationError
   167  	Outcome           outcome
   168  }
   169  
   170  type testCase struct {
   171  	Description string
   172  	URI         string
   173  	Phases      []phase
   174  }
   175  
   176  func serverDescriptionChanged(e *event.ServerDescriptionChangedEvent) {
   177  	lock.Lock()
   178  	publishedEvents = append(publishedEvents, *e)
   179  	lock.Unlock()
   180  }
   181  
   182  func serverOpening(e *event.ServerOpeningEvent) {
   183  	lock.Lock()
   184  	publishedEvents = append(publishedEvents, *e)
   185  	lock.Unlock()
   186  }
   187  
   188  func topologyDescriptionChanged(e *event.TopologyDescriptionChangedEvent) {
   189  	lock.Lock()
   190  	publishedEvents = append(publishedEvents, *e)
   191  	lock.Unlock()
   192  }
   193  
   194  func topologyOpening(e *event.TopologyOpeningEvent) {
   195  	lock.Lock()
   196  	publishedEvents = append(publishedEvents, *e)
   197  	lock.Unlock()
   198  }
   199  
   200  func serverClosed(e *event.ServerClosedEvent) {
   201  	lock.Lock()
   202  	publishedEvents = append(publishedEvents, *e)
   203  	lock.Unlock()
   204  }
   205  
   206  const testsDir string = "../../../../testdata/server-discovery-and-monitoring/"
   207  
   208  var publishedEvents []interface{}
   209  var lock sync.Mutex
   210  
   211  func (r *response) UnmarshalBSON(buf []byte) error {
   212  	doc := bson.Raw(buf)
   213  	if err := doc.Index(0).Value().Unmarshal(&r.Host); err != nil {
   214  		return fmt.Errorf("error unmarshalling Host: %w", err)
   215  	}
   216  
   217  	if err := doc.Index(1).Value().Unmarshal(&r.Hello); err != nil {
   218  		return fmt.Errorf("error unmarshalling Hello: %w", err)
   219  	}
   220  
   221  	return nil
   222  }
   223  
   224  func setUpTopology(t *testing.T, uri string) *Topology {
   225  	sdam := &event.ServerMonitor{
   226  		ServerDescriptionChanged:   serverDescriptionChanged,
   227  		ServerOpening:              serverOpening,
   228  		TopologyDescriptionChanged: topologyDescriptionChanged,
   229  		TopologyOpening:            topologyOpening,
   230  		ServerClosed:               serverClosed,
   231  	}
   232  
   233  	cfg, err := NewConfig(options.Client().ApplyURI(uri).SetServerMonitor(sdam), nil)
   234  	assert.Nil(t, err, "error constructing topology config: %v", err)
   235  
   236  	// Disable server monitoring because the hosts in the SDAM spec tests don't actually exist, so the server monitor
   237  	// can race with the test and mark the server Unknown when it fails to connect, which causes tests to fail.
   238  	cfg.ServerOpts = append(cfg.ServerOpts, withMonitoringDisabled(func(bool) bool { return true }))
   239  
   240  	topo, err := New(cfg)
   241  	assert.Nil(t, err, "topology.New error: %v", err)
   242  
   243  	err = topo.Connect()
   244  	assert.Nil(t, err, "topology.Connect error: %v", err)
   245  
   246  	return topo
   247  }
   248  
   249  func applyResponses(t *testing.T, topo *Topology, responses []response, sub *driver.Subscription) {
   250  	select {
   251  	case <-sub.Updates:
   252  	default:
   253  	}
   254  	for _, response := range responses {
   255  		doc, err := bson.Marshal(response.Hello)
   256  		assert.Nil(t, err, "Marshal error: %v", err)
   257  
   258  		addr := address.Address(response.Host)
   259  		desc := description.NewServer(addr, doc)
   260  		server, ok := topo.servers[addr]
   261  		if ok {
   262  			server.updateDescription(desc)
   263  		} else {
   264  			// for tests that check that server descriptions that aren't in the topology aren't applied
   265  			topo.apply(context.Background(), desc)
   266  		}
   267  		select {
   268  		case <-sub.Updates:
   269  		default:
   270  			return
   271  		}
   272  	}
   273  }
   274  
   275  type netErr struct {
   276  	timeout bool
   277  }
   278  
   279  func (n netErr) Error() string {
   280  	return "error"
   281  }
   282  
   283  func (n netErr) Timeout() bool {
   284  	return n.timeout
   285  }
   286  
   287  func (n netErr) Temporary() bool {
   288  	return false
   289  }
   290  
   291  var _ net.Error = (*netErr)(nil)
   292  
   293  func applyErrors(t *testing.T, topo *Topology, errors []applicationError) {
   294  	for _, appErr := range errors {
   295  		var currError error
   296  		switch appErr.Type {
   297  		case "command":
   298  			currError = driver.ExtractErrorFromServerResponse(context.Background(), appErr.Response)
   299  		case "network":
   300  			currError = driver.Error{
   301  				Labels:  []string{driver.NetworkError},
   302  				Wrapped: ConnectionError{Wrapped: netErr{false}},
   303  			}
   304  		case "timeout":
   305  			currError = driver.Error{
   306  				Labels:  []string{driver.NetworkError},
   307  				Wrapped: ConnectionError{Wrapped: netErr{true}},
   308  			}
   309  		default:
   310  			t.Fatalf("unrecognized error type: %v", appErr.Type)
   311  		}
   312  		server, ok := topo.servers[address.Address(appErr.Address)]
   313  		assert.True(t, ok, "server not found: %v", appErr.Address)
   314  
   315  		desc := server.Description()
   316  		versionRange := description.NewVersionRange(0, *appErr.MaxWireVersion)
   317  		desc.WireVersion = &versionRange
   318  
   319  		generation, _ := server.pool.generation.getGeneration(nil)
   320  		if appErr.Generation != nil {
   321  			generation = *appErr.Generation
   322  		}
   323  		//use generation number to check conn stale
   324  		innerConn := connection{
   325  			desc:       desc,
   326  			generation: generation,
   327  			pool:       server.pool,
   328  		}
   329  		conn := Connection{connection: &innerConn}
   330  
   331  		switch appErr.When {
   332  		case "beforeHandshakeCompletes":
   333  			server.ProcessHandshakeError(currError, generation, nil)
   334  		case "afterHandshakeCompletes":
   335  			_ = server.ProcessError(currError, &conn)
   336  		default:
   337  			t.Fatalf("unrecognized applicationError.When value: %v", appErr.When)
   338  		}
   339  	}
   340  }
   341  
   342  func compareServerDescriptions(t *testing.T,
   343  	expected serverDescription, actual description.Server, idx int) {
   344  	t.Helper()
   345  
   346  	assert.Equal(t, expected.Address, actual.Addr.String(),
   347  		"%v: expected server address %s, got %s", idx, expected.Address, actual.Addr)
   348  
   349  	assert.Equal(t, len(expected.Hosts), len(actual.Hosts),
   350  		"%v: expected %d hosts, got %d", idx, len(expected.Hosts), len(actual.Hosts))
   351  	for idx, expectedHost := range expected.Hosts {
   352  		actualHost := actual.Hosts[idx]
   353  		assert.Equal(t, expectedHost, actualHost, "%v: expected host %s, got %s", idx, expectedHost, actualHost)
   354  	}
   355  
   356  	assert.Equal(t, len(expected.Passives), len(actual.Passives),
   357  		"%v: expected %d hosts, got %d", idx, len(expected.Passives), len(actual.Passives))
   358  	for idx, expectedPassive := range expected.Passives {
   359  		actualPassive := actual.Passives[idx]
   360  		assert.Equal(t, expectedPassive, actualPassive, "%v: expected passive %s, got %s", idx, expectedPassive, actualPassive)
   361  	}
   362  
   363  	assert.Equal(t, expected.Primary, string(actual.Primary),
   364  		"%v: expected primary %s, got %s", idx, expected.Primary, actual.Primary)
   365  	assert.Equal(t, expected.SetName, actual.SetName,
   366  		"%v: expected set name %s, got %s", idx, expected.SetName, actual.SetName)
   367  
   368  	// PossiblePrimary is only relevant to single-threaded drivers.
   369  	if expected.Type == "PossiblePrimary" {
   370  		expected.Type = "Unknown"
   371  	}
   372  	assert.Equal(t, expected.Type, actual.Kind.String(),
   373  		"%v: expected server kind %s, got %s", idx, expected.Type, actual.Kind.String())
   374  }
   375  
   376  func compareTopologyDescriptions(t *testing.T,
   377  	expected topologyDescription, actual description.Topology, idx int) {
   378  	t.Helper()
   379  
   380  	assert.Equal(t, expected.TopologyType, actual.Kind.String(),
   381  		"%v: expected topology kind %s, got %s", idx, expected.TopologyType, actual.Kind.String())
   382  	assert.Equal(t, len(expected.Servers), len(actual.Servers),
   383  		"%v: expected %d servers, got %d", idx, len(expected.Servers), len(actual.Servers))
   384  
   385  	for idx, es := range expected.Servers {
   386  		as := actual.Servers[idx]
   387  		compareServerDescriptions(t, es, as, idx)
   388  	}
   389  
   390  	assert.Equal(t, expected.SetName, actual.SetName,
   391  		"%v: expected set name %s, got %s", idx, expected.SetName, actual.SetName)
   392  }
   393  
   394  func compareEvents(t *testing.T, events []monitoringEvent) {
   395  	t.Helper()
   396  
   397  	lock.Lock()
   398  	defer lock.Unlock()
   399  
   400  	assert.Equal(t, len(events), len(publishedEvents),
   401  		"expected %d published events, got %d\n",
   402  		len(events), len(publishedEvents))
   403  
   404  	for idx, me := range events {
   405  		if me.TopologyOpeningEvent != nil {
   406  			actual, ok := publishedEvents[idx].(event.TopologyOpeningEvent)
   407  			assert.True(t, ok, "%v: expected type %T, got %T", idx, event.TopologyOpeningEvent{}, publishedEvents[idx])
   408  			assert.False(t, actual.TopologyID.IsZero(), "%v: expected topology id", idx)
   409  		}
   410  		if me.ServerOpeningEvent != nil {
   411  			actual, ok := publishedEvents[idx].(event.ServerOpeningEvent)
   412  			assert.True(t, ok, "%v: expected type %T, got %T", idx, event.ServerOpeningEvent{}, publishedEvents[idx])
   413  
   414  			evt := me.ServerOpeningEvent
   415  			assert.Equal(t, evt.Address, string(actual.Address),
   416  				"%v: expected address %s, got %s", idx, evt.Address, actual.Address)
   417  			assert.False(t, actual.TopologyID.IsZero(), "%v: expected topology id", idx)
   418  		}
   419  		if me.TopologyDescriptionChangedEvent != nil {
   420  			actual, ok := publishedEvents[idx].(event.TopologyDescriptionChangedEvent)
   421  			assert.True(t, ok, "%v: expected type %T, got %T", idx, event.TopologyDescriptionChangedEvent{}, publishedEvents[idx])
   422  
   423  			evt := me.TopologyDescriptionChangedEvent
   424  			compareTopologyDescriptions(t, evt.PreviousDescription, actual.PreviousDescription, idx)
   425  			compareTopologyDescriptions(t, evt.NewDescription, actual.NewDescription, idx)
   426  			assert.False(t, actual.TopologyID.IsZero(), "%v: expected topology id", idx)
   427  		}
   428  		if me.ServerDescriptionChangedEvent != nil {
   429  			actual, ok := publishedEvents[idx].(event.ServerDescriptionChangedEvent)
   430  			assert.True(t, ok, "%v: expected type %T, got %T", idx, event.ServerDescriptionChangedEvent{}, publishedEvents[idx])
   431  
   432  			evt := me.ServerDescriptionChangedEvent
   433  			assert.Equal(t, evt.Address, string(actual.Address),
   434  				"%v: expected server address %s, got %s", idx, evt.Address, actual.Address)
   435  			compareServerDescriptions(t, evt.PreviousDescription, actual.PreviousDescription, idx)
   436  			compareServerDescriptions(t, evt.NewDescription, actual.NewDescription, idx)
   437  			assert.False(t, actual.TopologyID.IsZero(), "%v: expected topology id", idx)
   438  		}
   439  		if me.ServerClosedEvent != nil {
   440  			actual, ok := publishedEvents[idx].(event.ServerClosedEvent)
   441  			assert.True(t, ok, "%v: expected type %T, got %T", idx, event.ServerClosedEvent{}, publishedEvents[idx])
   442  
   443  			evt := me.ServerClosedEvent
   444  			assert.Equal(t, evt.Address, string(actual.Address),
   445  				"%v: expected server address %s, got %s", idx, evt.Address, actual.Address)
   446  			assert.False(t, actual.TopologyID.IsZero(), "%v: expected topology id", idx)
   447  		}
   448  	}
   449  }
   450  
   451  func findServerInTopology(topo description.Topology, addr address.Address) (description.Server, bool) {
   452  	for _, server := range topo.Servers {
   453  		if server.Addr.String() == addr.String() {
   454  			return server, true
   455  		}
   456  	}
   457  	return description.Server{}, false
   458  }
   459  
   460  func runTest(t *testing.T, directory string, filename string) {
   461  	filepath := path.Join(testsDir, directory, filename)
   462  	content, err := ioutil.ReadFile(filepath)
   463  	assert.Nil(t, err, "ReadFile error: %v", err)
   464  
   465  	// Remove ".json" from filename.
   466  	filename = filename[:len(filename)-5]
   467  	testName := directory + "/" + filename + ":"
   468  
   469  	t.Run(testName, func(t *testing.T) {
   470  		var test testCase
   471  		err = bson.UnmarshalExtJSON(content, false, &test)
   472  		assert.Nil(t, err, "Unmarshal error: %v", err)
   473  		topo := setUpTopology(t, test.URI)
   474  		sub, err := topo.Subscribe()
   475  		assert.Nil(t, err, "subscribe error: %v", err)
   476  
   477  		for _, phase := range test.Phases {
   478  			applyResponses(t, topo, phase.Responses, sub)
   479  			applyErrors(t, topo, phase.ApplicationErrors)
   480  
   481  			if phase.Outcome.Events != nil {
   482  				compareEvents(t, phase.Outcome.Events)
   483  				publishedEvents = nil
   484  				continue
   485  			}
   486  			publishedEvents = nil
   487  			if phase.Outcome.Compatible == nil || *phase.Outcome.Compatible {
   488  				assert.True(t, topo.fsm.compatible.Load().(bool), "Expected servers to be compatible")
   489  				assert.Nil(t, topo.fsm.compatibilityErr, "expected fsm.compatibility to be nil, got %v",
   490  					topo.fsm.compatibilityErr)
   491  			} else {
   492  				assert.False(t, topo.fsm.compatible.Load().(bool), "Expected servers to not be compatible")
   493  				assert.NotNil(t, topo.fsm.compatibilityErr, "expected fsm.compatibility error to be non-nil")
   494  				continue
   495  			}
   496  			desc := topo.Description()
   497  
   498  			assert.Equal(t, phase.Outcome.TopologyType, desc.Kind.String(),
   499  				"expected TopologyType to be %v, got %v", phase.Outcome.TopologyType, desc.Kind.String())
   500  			assert.Equal(t, phase.Outcome.SetName, topo.fsm.SetName,
   501  				"expected SetName to be %v, got %v", phase.Outcome.SetName, topo.fsm.SetName)
   502  			assert.Equal(t, len(phase.Outcome.Servers), len(desc.Servers),
   503  				"expected %v servers, got %v", len(phase.Outcome.Servers), len(desc.Servers))
   504  
   505  			assert.Equal(t,
   506  				phase.Outcome.LogicalSessionTimeoutMinutes,
   507  				desc.SessionTimeoutMinutesPtr,
   508  				"expected and actual logical session timeout minutes are different")
   509  
   510  			assert.Equal(t, phase.Outcome.MaxSetVersion, topo.fsm.maxSetVersion,
   511  				"expected maxSetVersion to be %v, got %v", phase.Outcome.MaxSetVersion, topo.fsm.maxSetVersion)
   512  			assert.Equal(t, phase.Outcome.MaxElectionID, topo.fsm.maxElectionID,
   513  				"expected maxElectionID to be %v, got %v", phase.Outcome.MaxElectionID, topo.fsm.maxElectionID)
   514  
   515  			for addr, server := range phase.Outcome.Servers {
   516  				fsmServer, ok := findServerInTopology(desc, address.Address(addr))
   517  				assert.True(t, ok, "Couldn't find server %v", addr)
   518  
   519  				assert.Equal(t, address.Address(addr), fsmServer.Addr,
   520  					"expected server address to be %v, got %v", address.Address(addr), fsmServer.Addr)
   521  				assert.Equal(t, server.SetName, fsmServer.SetName,
   522  					"expected server SetName to be %v, got %v", server.SetName, fsmServer.SetName)
   523  				assert.Equal(t, server.SetVersion, fsmServer.SetVersion,
   524  					"expected server SetVersion to be %v, got %v", server.SetVersion, fsmServer.SetVersion)
   525  				if server.ElectionID != nil {
   526  					assert.Equal(t, *server.ElectionID, fsmServer.ElectionID,
   527  						"expected server ElectionID to be %v, got %v", *server.ElectionID, fsmServer.ElectionID)
   528  				}
   529  				if server.TopologyVersion != nil {
   530  
   531  					assert.NotNil(t, fsmServer.TopologyVersion, "expected server TopologyVersion not to be nil")
   532  					assert.Equal(t, server.TopologyVersion.ProcessID, fsmServer.TopologyVersion.ProcessID,
   533  						"expected server TopologyVersion ProcessID to be %v, got %v", server.TopologyVersion.ProcessID, fsmServer.TopologyVersion.ProcessID)
   534  					assert.Equal(t, server.TopologyVersion.Counter, fsmServer.TopologyVersion.Counter,
   535  						"expected server TopologyVersion Counter to be %v, got %v", server.TopologyVersion.Counter, fsmServer.TopologyVersion.Counter)
   536  				} else {
   537  					assert.Nil(t, fsmServer.TopologyVersion, "expected server TopologyVersion to be nil")
   538  				}
   539  
   540  				// PossiblePrimary is only relevant to single-threaded drivers.
   541  				if server.Type == "PossiblePrimary" {
   542  					server.Type = "Unknown"
   543  				}
   544  
   545  				assert.Equal(t, server.Type, fsmServer.Kind.String(),
   546  					"expected server Type to be %v, got %v", server.Type, fsmServer.Kind.String())
   547  				if server.Pool != nil {
   548  					topo.serversLock.Lock()
   549  					actualServer := topo.servers[address.Address(addr)]
   550  					topo.serversLock.Unlock()
   551  					actualGeneration, _ := actualServer.pool.generation.getGeneration(nil)
   552  					assert.Equal(t, server.Pool.Generation, actualGeneration,
   553  						"expected server pool generation to be %v, got %v", server.Pool.Generation, actualGeneration)
   554  				}
   555  			}
   556  		}
   557  	})
   558  }
   559  
   560  // Test case for all SDAM spec tests.
   561  func TestSDAMSpec(t *testing.T) {
   562  	for _, subdir := range []string{"single", "rs", "sharded", "load-balanced", "errors", "monitoring"} {
   563  		for _, file := range spectest.FindJSONFilesInDir(t, path.Join(testsDir, subdir)) {
   564  			runTest(t, subdir, file)
   565  		}
   566  	}
   567  }
   568  
   569  func TestHasStalePrimary(t *testing.T) {
   570  	t.Parallel()
   571  
   572  	t.Run("WV17 SEI EQ MEI and SSV LT MSV", func(t *testing.T) {
   573  		t.Parallel()
   574  
   575  		srv := description.Server{
   576  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   577  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now()),
   578  			SetVersion:  1,
   579  		}
   580  
   581  		fsm := fsm{
   582  			maxElectionID: srv.ElectionID,
   583  			maxSetVersion: 2,
   584  		}
   585  
   586  		boolVal := hasStalePrimary(fsm, srv)
   587  		assert.True(t, boolVal, "expected true, got false")
   588  	})
   589  
   590  	t.Run("WV17 SEI EQ MEI and SSV GT MSV", func(t *testing.T) {
   591  		t.Parallel()
   592  
   593  		srv := description.Server{
   594  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   595  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now()),
   596  			SetVersion:  2,
   597  		}
   598  
   599  		fsm := fsm{
   600  			maxElectionID: srv.ElectionID,
   601  			maxSetVersion: 1,
   602  		}
   603  
   604  		boolVal := hasStalePrimary(fsm, srv)
   605  		assert.False(t, boolVal, "expected false, got true")
   606  	})
   607  
   608  	t.Run("WV17 SEI EQ MEI and SSV EQ MSV", func(t *testing.T) {
   609  		t.Parallel()
   610  
   611  		srv := description.Server{
   612  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   613  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now()),
   614  			SetVersion:  1,
   615  		}
   616  
   617  		fsm := fsm{
   618  			maxElectionID: srv.ElectionID,
   619  			maxSetVersion: 1,
   620  		}
   621  
   622  		boolVal := hasStalePrimary(fsm, srv)
   623  		assert.False(t, boolVal, "expected false, got true")
   624  	})
   625  
   626  	t.Run("WV17 SEI GT MEI and SSV LT MSV", func(t *testing.T) {
   627  		t.Parallel()
   628  
   629  		fsm := fsm{
   630  			maxElectionID: primitive.NewObjectIDFromTimestamp(time.Now()),
   631  			maxSetVersion: 2,
   632  		}
   633  
   634  		srv := description.Server{
   635  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   636  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now().Add(time.Second)),
   637  			SetVersion:  1,
   638  		}
   639  
   640  		boolVal := hasStalePrimary(fsm, srv)
   641  		assert.False(t, boolVal, "expected false, got true")
   642  	})
   643  
   644  	t.Run("WV17 SEI GT MEI and SSV GT MSV", func(t *testing.T) {
   645  		t.Parallel()
   646  
   647  		fsm := fsm{
   648  			maxElectionID: primitive.NewObjectIDFromTimestamp(time.Now()),
   649  			maxSetVersion: 1,
   650  		}
   651  
   652  		srv := description.Server{
   653  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   654  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now().Add(time.Second)),
   655  			SetVersion:  2,
   656  		}
   657  
   658  		boolVal := hasStalePrimary(fsm, srv)
   659  		assert.False(t, boolVal, "expected false, got true")
   660  	})
   661  
   662  	t.Run("WV17 SEI GT MEI and SSV EQ MSV", func(t *testing.T) {
   663  		t.Parallel()
   664  
   665  		fsm := fsm{
   666  			maxElectionID: primitive.NewObjectIDFromTimestamp(time.Now()),
   667  			maxSetVersion: 1,
   668  		}
   669  
   670  		srv := description.Server{
   671  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   672  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now().Add(time.Second)),
   673  			SetVersion:  1,
   674  		}
   675  
   676  		boolVal := hasStalePrimary(fsm, srv)
   677  		assert.False(t, boolVal, "expected false, got true")
   678  	})
   679  
   680  	t.Run("WV17 SEI LT MEI and SSV LT MSV", func(t *testing.T) {
   681  		t.Parallel()
   682  
   683  		srv := description.Server{
   684  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   685  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now().Add(-time.Second)),
   686  			SetVersion:  1,
   687  		}
   688  
   689  		fsm := fsm{
   690  			maxElectionID: primitive.NewObjectIDFromTimestamp(time.Now()),
   691  			maxSetVersion: 2,
   692  		}
   693  
   694  		boolVal := hasStalePrimary(fsm, srv)
   695  		assert.True(t, boolVal, "expected true, got false")
   696  	})
   697  
   698  	t.Run("WV17 SEI LT MEI and SSV GT MSV", func(t *testing.T) {
   699  		t.Parallel()
   700  
   701  		srv := description.Server{
   702  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   703  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now().Add(-time.Second)),
   704  			SetVersion:  2,
   705  		}
   706  
   707  		fsm := fsm{
   708  			maxElectionID: primitive.NewObjectIDFromTimestamp(time.Now()),
   709  			maxSetVersion: 1,
   710  		}
   711  
   712  		boolVal := hasStalePrimary(fsm, srv)
   713  		assert.True(t, boolVal, "expected true, got false")
   714  	})
   715  
   716  	t.Run("WV17 SEI LT MEI and SSV EQ MSV", func(t *testing.T) {
   717  		t.Parallel()
   718  
   719  		srv := description.Server{
   720  			WireVersion: &description.VersionRange{Min: 17, Max: 17},
   721  			ElectionID:  primitive.NewObjectIDFromTimestamp(time.Now().Add(-time.Second)),
   722  			SetVersion:  1,
   723  		}
   724  
   725  		fsm := fsm{
   726  			maxElectionID: primitive.NewObjectIDFromTimestamp(time.Now()),
   727  			maxSetVersion: 1,
   728  		}
   729  
   730  		boolVal := hasStalePrimary(fsm, srv)
   731  		assert.True(t, boolVal, "expected true, got false")
   732  	})
   733  }
   734  

View as plain text