...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/fsm.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/topology

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package topology
     8  
     9  import (
    10  	"bytes"
    11  	"fmt"
    12  	"sync/atomic"
    13  
    14  	"go.mongodb.org/mongo-driver/bson/primitive"
    15  	"go.mongodb.org/mongo-driver/internal/ptrutil"
    16  	"go.mongodb.org/mongo-driver/mongo/address"
    17  	"go.mongodb.org/mongo-driver/mongo/description"
    18  )
    19  
    20  var (
    21  	// MinSupportedMongoDBVersion is the version string for the lowest MongoDB version supported by the driver.
    22  	MinSupportedMongoDBVersion = "3.6"
    23  
    24  	// SupportedWireVersions is the range of wire versions supported by the driver.
    25  	SupportedWireVersions = description.NewVersionRange(6, 21)
    26  )
    27  
    28  type fsm struct {
    29  	description.Topology
    30  	maxElectionID    primitive.ObjectID
    31  	maxSetVersion    uint32
    32  	compatible       atomic.Value
    33  	compatibilityErr error
    34  }
    35  
    36  func newFSM() *fsm {
    37  	f := fsm{}
    38  	f.compatible.Store(true)
    39  	return &f
    40  }
    41  
    42  // selectFSMSessionTimeout selects the timeout to return for the topology's
    43  // finite state machine. If the logicalSessionTimeoutMinutes on the FSM exists
    44  // and the server is data-bearing, then we determine this value by returning
    45  //
    46  //	min{server timeout, FSM timeout}
    47  //
    48  // where a "nil" value is considered less than 0.
    49  //
    50  // Otherwise, if the FSM's logicalSessionTimeoutMinutes exist, then this
    51  // function returns the FSM timeout.
    52  //
    53  // In the case where the FSM timeout DNE, we check all servers to see if any
    54  // still do not have a timeout. This function chooses the lowest of the existing
    55  // timeouts.
    56  func selectFSMSessionTimeout(f *fsm, s description.Server) *int64 {
    57  	oldMinutes := f.SessionTimeoutMinutesPtr
    58  	comp := ptrutil.CompareInt64(oldMinutes, s.SessionTimeoutMinutesPtr)
    59  
    60  	// If the server is data-bearing and the current timeout exists and is
    61  	// either:
    62  	//
    63  	// 1. larger than the server timeout, or
    64  	// 2. non-nil while the server timeout is nil
    65  	//
    66  	// then return the server timeout.
    67  	if s.DataBearing() && (comp == 1 || comp == 2) {
    68  		return s.SessionTimeoutMinutesPtr
    69  	}
    70  
    71  	// If the current timeout exists and the server is not data-bearing OR
    72  	// min{server timeout, current timeout} = current timeout, then return
    73  	// the current timeout.
    74  	if oldMinutes != nil {
    75  		return oldMinutes
    76  	}
    77  
    78  	timeout := s.SessionTimeoutMinutesPtr
    79  	for _, server := range f.Servers {
    80  		// If the server is not data-bearing, then we do not consider
    81  		// it's timeout whether set or not.
    82  		if !server.DataBearing() {
    83  			continue
    84  		}
    85  
    86  		srvTimeout := server.SessionTimeoutMinutesPtr
    87  		comp := ptrutil.CompareInt64(timeout, srvTimeout)
    88  
    89  		if comp <= 0 { // timeout <= srvTimout
    90  			continue
    91  		}
    92  
    93  		timeout = server.SessionTimeoutMinutesPtr
    94  	}
    95  
    96  	return timeout
    97  }
    98  
    99  // apply takes a new server description and modifies the FSM's topology description based on it. It returns the
   100  // updated topology description as well as a server description. The returned server description is either the same
   101  // one that was passed in, or a new one in the case that it had to be changed.
   102  //
   103  // apply should operation on immutable descriptions so we don't have to lock for the entire time we're applying the
   104  // server description.
   105  func (f *fsm) apply(s description.Server) (description.Topology, description.Server) {
   106  	newServers := make([]description.Server, len(f.Servers))
   107  	copy(newServers, f.Servers)
   108  
   109  	// Reset the logicalSessionTimeoutMinutes to the minimum of the FSM
   110  	// and the description.server/f.servers.
   111  	serverTimeoutMinutes := selectFSMSessionTimeout(f, s)
   112  
   113  	f.Topology = description.Topology{
   114  		Kind:    f.Kind,
   115  		Servers: newServers,
   116  		SetName: f.SetName,
   117  	}
   118  
   119  	f.Topology.SessionTimeoutMinutesPtr = serverTimeoutMinutes
   120  
   121  	if serverTimeoutMinutes != nil {
   122  		f.SessionTimeoutMinutes = uint32(*serverTimeoutMinutes)
   123  	}
   124  
   125  	if _, ok := f.findServer(s.Addr); !ok {
   126  		return f.Topology, s
   127  	}
   128  
   129  	updatedDesc := s
   130  	switch f.Kind {
   131  	case description.Unknown:
   132  		updatedDesc = f.applyToUnknown(s)
   133  	case description.Sharded:
   134  		updatedDesc = f.applyToSharded(s)
   135  	case description.ReplicaSetNoPrimary:
   136  		updatedDesc = f.applyToReplicaSetNoPrimary(s)
   137  	case description.ReplicaSetWithPrimary:
   138  		updatedDesc = f.applyToReplicaSetWithPrimary(s)
   139  	case description.Single:
   140  		updatedDesc = f.applyToSingle(s)
   141  	}
   142  
   143  	for _, server := range f.Servers {
   144  		if server.WireVersion != nil {
   145  			if server.WireVersion.Max < SupportedWireVersions.Min {
   146  				f.compatible.Store(false)
   147  				f.compatibilityErr = fmt.Errorf(
   148  					"server at %s reports wire version %d, but this version of the Go driver requires "+
   149  						"at least %d (MongoDB %s)",
   150  					server.Addr.String(),
   151  					server.WireVersion.Max,
   152  					SupportedWireVersions.Min,
   153  					MinSupportedMongoDBVersion,
   154  				)
   155  				f.Topology.CompatibilityErr = f.compatibilityErr
   156  				return f.Topology, s
   157  			}
   158  
   159  			if server.WireVersion.Min > SupportedWireVersions.Max {
   160  				f.compatible.Store(false)
   161  				f.compatibilityErr = fmt.Errorf(
   162  					"server at %s requires wire version %d, but this version of the Go driver only supports up to %d",
   163  					server.Addr.String(),
   164  					server.WireVersion.Min,
   165  					SupportedWireVersions.Max,
   166  				)
   167  				f.Topology.CompatibilityErr = f.compatibilityErr
   168  				return f.Topology, s
   169  			}
   170  		}
   171  	}
   172  
   173  	f.compatible.Store(true)
   174  	f.compatibilityErr = nil
   175  
   176  	return f.Topology, updatedDesc
   177  }
   178  
   179  func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) description.Server {
   180  	switch s.Kind {
   181  	case description.Standalone, description.Mongos:
   182  		f.removeServerByAddr(s.Addr)
   183  	case description.RSPrimary:
   184  		f.updateRSFromPrimary(s)
   185  	case description.RSSecondary, description.RSArbiter, description.RSMember:
   186  		f.updateRSWithoutPrimary(s)
   187  	case description.Unknown, description.RSGhost:
   188  		f.replaceServer(s)
   189  	}
   190  
   191  	return s
   192  }
   193  
   194  func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) description.Server {
   195  	switch s.Kind {
   196  	case description.Standalone, description.Mongos:
   197  		f.removeServerByAddr(s.Addr)
   198  		f.checkIfHasPrimary()
   199  	case description.RSPrimary:
   200  		f.updateRSFromPrimary(s)
   201  	case description.RSSecondary, description.RSArbiter, description.RSMember:
   202  		f.updateRSWithPrimaryFromMember(s)
   203  	case description.Unknown, description.RSGhost:
   204  		f.replaceServer(s)
   205  		f.checkIfHasPrimary()
   206  	}
   207  
   208  	return s
   209  }
   210  
   211  func (f *fsm) applyToSharded(s description.Server) description.Server {
   212  	switch s.Kind {
   213  	case description.Mongos, description.Unknown:
   214  		f.replaceServer(s)
   215  	case description.Standalone, description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
   216  		f.removeServerByAddr(s.Addr)
   217  	}
   218  
   219  	return s
   220  }
   221  
   222  func (f *fsm) applyToSingle(s description.Server) description.Server {
   223  	switch s.Kind {
   224  	case description.Unknown:
   225  		f.replaceServer(s)
   226  	case description.Standalone, description.Mongos:
   227  		if f.SetName != "" {
   228  			f.removeServerByAddr(s.Addr)
   229  			return s
   230  		}
   231  
   232  		f.replaceServer(s)
   233  	case description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
   234  		// A replica set name can be provided when creating a direct connection. In this case, if the set name returned
   235  		// by the hello response doesn't match up with the one provided during configuration, the server description
   236  		// is replaced with a default Unknown description.
   237  		//
   238  		// We create a new server description rather than doing s.Kind = description.Unknown because the other fields,
   239  		// such as RTT, need to be cleared for Unknown descriptions as well.
   240  		if f.SetName != "" && f.SetName != s.SetName {
   241  			s = description.Server{
   242  				Addr: s.Addr,
   243  				Kind: description.Unknown,
   244  			}
   245  		}
   246  
   247  		f.replaceServer(s)
   248  	}
   249  
   250  	return s
   251  }
   252  
   253  func (f *fsm) applyToUnknown(s description.Server) description.Server {
   254  	switch s.Kind {
   255  	case description.Mongos:
   256  		f.setKind(description.Sharded)
   257  		f.replaceServer(s)
   258  	case description.RSPrimary:
   259  		f.updateRSFromPrimary(s)
   260  	case description.RSSecondary, description.RSArbiter, description.RSMember:
   261  		f.setKind(description.ReplicaSetNoPrimary)
   262  		f.updateRSWithoutPrimary(s)
   263  	case description.Standalone:
   264  		f.updateUnknownWithStandalone(s)
   265  	case description.Unknown, description.RSGhost:
   266  		f.replaceServer(s)
   267  	}
   268  
   269  	return s
   270  }
   271  
   272  func (f *fsm) checkIfHasPrimary() {
   273  	if _, ok := f.findPrimary(); ok {
   274  		f.setKind(description.ReplicaSetWithPrimary)
   275  	} else {
   276  		f.setKind(description.ReplicaSetNoPrimary)
   277  	}
   278  }
   279  
   280  // hasStalePrimary returns true if the topology has a primary that is "stale".
   281  func hasStalePrimary(fsm fsm, srv description.Server) bool {
   282  	// Compare the election ID values of the server and the topology lexicographically.
   283  	compRes := bytes.Compare(srv.ElectionID[:], fsm.maxElectionID[:])
   284  
   285  	if wireVersion := srv.WireVersion; wireVersion != nil && wireVersion.Max >= 17 {
   286  		// In the Post-6.0 case, a primary is considered "stale" if the server's election ID is greater than the
   287  		// topology's max election ID. In these versions, the primary is also considered "stale" if the server's
   288  		// election ID is LTE to the topologies election ID and the server's "setVersion" is less than the topology's
   289  		// max "setVersion".
   290  		return compRes == -1 || (compRes != 1 && srv.SetVersion < fsm.maxSetVersion)
   291  	}
   292  
   293  	// If the server's election ID is less than the topology's max election ID, the primary is considered
   294  	// "stale". Similarly, if the server's "setVersion" is less than the topology's max "setVersion", the
   295  	// primary is considered stale.
   296  	return compRes == -1 || fsm.maxSetVersion > srv.SetVersion
   297  }
   298  
   299  // transferEVTuple will transfer the ("ElectionID", "SetVersion") tuple from the description server to the topology.
   300  // If the primary is stale, the tuple will not be transferred, the topology will update it's "Kind" value, and this
   301  // routine will return "false".
   302  func transferEVTuple(srv description.Server, fsm *fsm) bool {
   303  	stalePrimary := hasStalePrimary(*fsm, srv)
   304  
   305  	if wireVersion := srv.WireVersion; wireVersion != nil && wireVersion.Max >= 17 {
   306  		if stalePrimary {
   307  			fsm.checkIfHasPrimary()
   308  			return false
   309  		}
   310  
   311  		fsm.maxElectionID = srv.ElectionID
   312  		fsm.maxSetVersion = srv.SetVersion
   313  
   314  		return true
   315  	}
   316  
   317  	if srv.SetVersion != 0 && !srv.ElectionID.IsZero() {
   318  		if stalePrimary {
   319  			fsm.replaceServer(description.Server{
   320  				Addr: srv.Addr,
   321  				LastError: fmt.Errorf(
   322  					"was a primary, but its set version or election id is stale"),
   323  			})
   324  
   325  			fsm.checkIfHasPrimary()
   326  
   327  			return false
   328  		}
   329  
   330  		fsm.maxElectionID = srv.ElectionID
   331  	}
   332  
   333  	if srv.SetVersion > fsm.maxSetVersion {
   334  		fsm.maxSetVersion = srv.SetVersion
   335  	}
   336  
   337  	return true
   338  }
   339  
   340  func (f *fsm) updateRSFromPrimary(srv description.Server) {
   341  	if f.SetName == "" {
   342  		f.SetName = srv.SetName
   343  	} else if f.SetName != srv.SetName {
   344  		f.removeServerByAddr(srv.Addr)
   345  		f.checkIfHasPrimary()
   346  
   347  		return
   348  	}
   349  
   350  	if ok := transferEVTuple(srv, f); !ok {
   351  		return
   352  	}
   353  
   354  	if j, ok := f.findPrimary(); ok {
   355  		f.setServer(j, description.Server{
   356  			Addr:      f.Servers[j].Addr,
   357  			LastError: fmt.Errorf("was a primary, but a new primary was discovered"),
   358  		})
   359  	}
   360  
   361  	f.replaceServer(srv)
   362  
   363  	for j := len(f.Servers) - 1; j >= 0; j-- {
   364  		found := false
   365  		for _, member := range srv.Members {
   366  			if member == f.Servers[j].Addr {
   367  				found = true
   368  				break
   369  			}
   370  		}
   371  
   372  		if !found {
   373  			f.removeServer(j)
   374  		}
   375  	}
   376  
   377  	for _, member := range srv.Members {
   378  		if _, ok := f.findServer(member); !ok {
   379  			f.addServer(member)
   380  		}
   381  	}
   382  
   383  	f.checkIfHasPrimary()
   384  }
   385  
   386  func (f *fsm) updateRSWithPrimaryFromMember(s description.Server) {
   387  	if f.SetName != s.SetName {
   388  		f.removeServerByAddr(s.Addr)
   389  		f.checkIfHasPrimary()
   390  		return
   391  	}
   392  
   393  	if s.Addr != s.CanonicalAddr {
   394  		f.removeServerByAddr(s.Addr)
   395  		f.checkIfHasPrimary()
   396  		return
   397  	}
   398  
   399  	f.replaceServer(s)
   400  
   401  	if _, ok := f.findPrimary(); !ok {
   402  		f.setKind(description.ReplicaSetNoPrimary)
   403  	}
   404  }
   405  
   406  func (f *fsm) updateRSWithoutPrimary(s description.Server) {
   407  	if f.SetName == "" {
   408  		f.SetName = s.SetName
   409  	} else if f.SetName != s.SetName {
   410  		f.removeServerByAddr(s.Addr)
   411  		return
   412  	}
   413  
   414  	for _, member := range s.Members {
   415  		if _, ok := f.findServer(member); !ok {
   416  			f.addServer(member)
   417  		}
   418  	}
   419  
   420  	if s.Addr != s.CanonicalAddr {
   421  		f.removeServerByAddr(s.Addr)
   422  		return
   423  	}
   424  
   425  	f.replaceServer(s)
   426  }
   427  
   428  func (f *fsm) updateUnknownWithStandalone(s description.Server) {
   429  	if len(f.Servers) > 1 {
   430  		f.removeServerByAddr(s.Addr)
   431  		return
   432  	}
   433  
   434  	f.setKind(description.Single)
   435  	f.replaceServer(s)
   436  }
   437  
   438  func (f *fsm) addServer(addr address.Address) {
   439  	f.Servers = append(f.Servers, description.Server{
   440  		Addr: addr.Canonicalize(),
   441  	})
   442  }
   443  
   444  func (f *fsm) findPrimary() (int, bool) {
   445  	for i, s := range f.Servers {
   446  		if s.Kind == description.RSPrimary {
   447  			return i, true
   448  		}
   449  	}
   450  
   451  	return 0, false
   452  }
   453  
   454  func (f *fsm) findServer(addr address.Address) (int, bool) {
   455  	canon := addr.Canonicalize()
   456  	for i, s := range f.Servers {
   457  		if canon == s.Addr {
   458  			return i, true
   459  		}
   460  	}
   461  
   462  	return 0, false
   463  }
   464  
   465  func (f *fsm) removeServer(i int) {
   466  	f.Servers = append(f.Servers[:i], f.Servers[i+1:]...)
   467  }
   468  
   469  func (f *fsm) removeServerByAddr(addr address.Address) {
   470  	if i, ok := f.findServer(addr); ok {
   471  		f.removeServer(i)
   472  	}
   473  }
   474  
   475  func (f *fsm) replaceServer(s description.Server) {
   476  	if i, ok := f.findServer(s.Addr); ok {
   477  		f.setServer(i, s)
   478  	}
   479  }
   480  
   481  func (f *fsm) setServer(i int, s description.Server) {
   482  	f.Servers[i] = s
   483  }
   484  
   485  func (f *fsm) setKind(k description.TopologyKind) {
   486  	f.Kind = k
   487  }
   488  

View as plain text