1
2
3
4
5
6
7 package topology
8
9 import (
10 "bytes"
11 "fmt"
12 "sync/atomic"
13
14 "go.mongodb.org/mongo-driver/bson/primitive"
15 "go.mongodb.org/mongo-driver/internal/ptrutil"
16 "go.mongodb.org/mongo-driver/mongo/address"
17 "go.mongodb.org/mongo-driver/mongo/description"
18 )
19
20 var (
21
22 MinSupportedMongoDBVersion = "3.6"
23
24
25 SupportedWireVersions = description.NewVersionRange(6, 21)
26 )
27
28 type fsm struct {
29 description.Topology
30 maxElectionID primitive.ObjectID
31 maxSetVersion uint32
32 compatible atomic.Value
33 compatibilityErr error
34 }
35
36 func newFSM() *fsm {
37 f := fsm{}
38 f.compatible.Store(true)
39 return &f
40 }
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 func selectFSMSessionTimeout(f *fsm, s description.Server) *int64 {
57 oldMinutes := f.SessionTimeoutMinutesPtr
58 comp := ptrutil.CompareInt64(oldMinutes, s.SessionTimeoutMinutesPtr)
59
60
61
62
63
64
65
66
67 if s.DataBearing() && (comp == 1 || comp == 2) {
68 return s.SessionTimeoutMinutesPtr
69 }
70
71
72
73
74 if oldMinutes != nil {
75 return oldMinutes
76 }
77
78 timeout := s.SessionTimeoutMinutesPtr
79 for _, server := range f.Servers {
80
81
82 if !server.DataBearing() {
83 continue
84 }
85
86 srvTimeout := server.SessionTimeoutMinutesPtr
87 comp := ptrutil.CompareInt64(timeout, srvTimeout)
88
89 if comp <= 0 {
90 continue
91 }
92
93 timeout = server.SessionTimeoutMinutesPtr
94 }
95
96 return timeout
97 }
98
99
100
101
102
103
104
105 func (f *fsm) apply(s description.Server) (description.Topology, description.Server) {
106 newServers := make([]description.Server, len(f.Servers))
107 copy(newServers, f.Servers)
108
109
110
111 serverTimeoutMinutes := selectFSMSessionTimeout(f, s)
112
113 f.Topology = description.Topology{
114 Kind: f.Kind,
115 Servers: newServers,
116 SetName: f.SetName,
117 }
118
119 f.Topology.SessionTimeoutMinutesPtr = serverTimeoutMinutes
120
121 if serverTimeoutMinutes != nil {
122 f.SessionTimeoutMinutes = uint32(*serverTimeoutMinutes)
123 }
124
125 if _, ok := f.findServer(s.Addr); !ok {
126 return f.Topology, s
127 }
128
129 updatedDesc := s
130 switch f.Kind {
131 case description.Unknown:
132 updatedDesc = f.applyToUnknown(s)
133 case description.Sharded:
134 updatedDesc = f.applyToSharded(s)
135 case description.ReplicaSetNoPrimary:
136 updatedDesc = f.applyToReplicaSetNoPrimary(s)
137 case description.ReplicaSetWithPrimary:
138 updatedDesc = f.applyToReplicaSetWithPrimary(s)
139 case description.Single:
140 updatedDesc = f.applyToSingle(s)
141 }
142
143 for _, server := range f.Servers {
144 if server.WireVersion != nil {
145 if server.WireVersion.Max < SupportedWireVersions.Min {
146 f.compatible.Store(false)
147 f.compatibilityErr = fmt.Errorf(
148 "server at %s reports wire version %d, but this version of the Go driver requires "+
149 "at least %d (MongoDB %s)",
150 server.Addr.String(),
151 server.WireVersion.Max,
152 SupportedWireVersions.Min,
153 MinSupportedMongoDBVersion,
154 )
155 f.Topology.CompatibilityErr = f.compatibilityErr
156 return f.Topology, s
157 }
158
159 if server.WireVersion.Min > SupportedWireVersions.Max {
160 f.compatible.Store(false)
161 f.compatibilityErr = fmt.Errorf(
162 "server at %s requires wire version %d, but this version of the Go driver only supports up to %d",
163 server.Addr.String(),
164 server.WireVersion.Min,
165 SupportedWireVersions.Max,
166 )
167 f.Topology.CompatibilityErr = f.compatibilityErr
168 return f.Topology, s
169 }
170 }
171 }
172
173 f.compatible.Store(true)
174 f.compatibilityErr = nil
175
176 return f.Topology, updatedDesc
177 }
178
179 func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) description.Server {
180 switch s.Kind {
181 case description.Standalone, description.Mongos:
182 f.removeServerByAddr(s.Addr)
183 case description.RSPrimary:
184 f.updateRSFromPrimary(s)
185 case description.RSSecondary, description.RSArbiter, description.RSMember:
186 f.updateRSWithoutPrimary(s)
187 case description.Unknown, description.RSGhost:
188 f.replaceServer(s)
189 }
190
191 return s
192 }
193
194 func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) description.Server {
195 switch s.Kind {
196 case description.Standalone, description.Mongos:
197 f.removeServerByAddr(s.Addr)
198 f.checkIfHasPrimary()
199 case description.RSPrimary:
200 f.updateRSFromPrimary(s)
201 case description.RSSecondary, description.RSArbiter, description.RSMember:
202 f.updateRSWithPrimaryFromMember(s)
203 case description.Unknown, description.RSGhost:
204 f.replaceServer(s)
205 f.checkIfHasPrimary()
206 }
207
208 return s
209 }
210
211 func (f *fsm) applyToSharded(s description.Server) description.Server {
212 switch s.Kind {
213 case description.Mongos, description.Unknown:
214 f.replaceServer(s)
215 case description.Standalone, description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
216 f.removeServerByAddr(s.Addr)
217 }
218
219 return s
220 }
221
222 func (f *fsm) applyToSingle(s description.Server) description.Server {
223 switch s.Kind {
224 case description.Unknown:
225 f.replaceServer(s)
226 case description.Standalone, description.Mongos:
227 if f.SetName != "" {
228 f.removeServerByAddr(s.Addr)
229 return s
230 }
231
232 f.replaceServer(s)
233 case description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
234
235
236
237
238
239
240 if f.SetName != "" && f.SetName != s.SetName {
241 s = description.Server{
242 Addr: s.Addr,
243 Kind: description.Unknown,
244 }
245 }
246
247 f.replaceServer(s)
248 }
249
250 return s
251 }
252
253 func (f *fsm) applyToUnknown(s description.Server) description.Server {
254 switch s.Kind {
255 case description.Mongos:
256 f.setKind(description.Sharded)
257 f.replaceServer(s)
258 case description.RSPrimary:
259 f.updateRSFromPrimary(s)
260 case description.RSSecondary, description.RSArbiter, description.RSMember:
261 f.setKind(description.ReplicaSetNoPrimary)
262 f.updateRSWithoutPrimary(s)
263 case description.Standalone:
264 f.updateUnknownWithStandalone(s)
265 case description.Unknown, description.RSGhost:
266 f.replaceServer(s)
267 }
268
269 return s
270 }
271
272 func (f *fsm) checkIfHasPrimary() {
273 if _, ok := f.findPrimary(); ok {
274 f.setKind(description.ReplicaSetWithPrimary)
275 } else {
276 f.setKind(description.ReplicaSetNoPrimary)
277 }
278 }
279
280
281 func hasStalePrimary(fsm fsm, srv description.Server) bool {
282
283 compRes := bytes.Compare(srv.ElectionID[:], fsm.maxElectionID[:])
284
285 if wireVersion := srv.WireVersion; wireVersion != nil && wireVersion.Max >= 17 {
286
287
288
289
290 return compRes == -1 || (compRes != 1 && srv.SetVersion < fsm.maxSetVersion)
291 }
292
293
294
295
296 return compRes == -1 || fsm.maxSetVersion > srv.SetVersion
297 }
298
299
300
301
302 func transferEVTuple(srv description.Server, fsm *fsm) bool {
303 stalePrimary := hasStalePrimary(*fsm, srv)
304
305 if wireVersion := srv.WireVersion; wireVersion != nil && wireVersion.Max >= 17 {
306 if stalePrimary {
307 fsm.checkIfHasPrimary()
308 return false
309 }
310
311 fsm.maxElectionID = srv.ElectionID
312 fsm.maxSetVersion = srv.SetVersion
313
314 return true
315 }
316
317 if srv.SetVersion != 0 && !srv.ElectionID.IsZero() {
318 if stalePrimary {
319 fsm.replaceServer(description.Server{
320 Addr: srv.Addr,
321 LastError: fmt.Errorf(
322 "was a primary, but its set version or election id is stale"),
323 })
324
325 fsm.checkIfHasPrimary()
326
327 return false
328 }
329
330 fsm.maxElectionID = srv.ElectionID
331 }
332
333 if srv.SetVersion > fsm.maxSetVersion {
334 fsm.maxSetVersion = srv.SetVersion
335 }
336
337 return true
338 }
339
340 func (f *fsm) updateRSFromPrimary(srv description.Server) {
341 if f.SetName == "" {
342 f.SetName = srv.SetName
343 } else if f.SetName != srv.SetName {
344 f.removeServerByAddr(srv.Addr)
345 f.checkIfHasPrimary()
346
347 return
348 }
349
350 if ok := transferEVTuple(srv, f); !ok {
351 return
352 }
353
354 if j, ok := f.findPrimary(); ok {
355 f.setServer(j, description.Server{
356 Addr: f.Servers[j].Addr,
357 LastError: fmt.Errorf("was a primary, but a new primary was discovered"),
358 })
359 }
360
361 f.replaceServer(srv)
362
363 for j := len(f.Servers) - 1; j >= 0; j-- {
364 found := false
365 for _, member := range srv.Members {
366 if member == f.Servers[j].Addr {
367 found = true
368 break
369 }
370 }
371
372 if !found {
373 f.removeServer(j)
374 }
375 }
376
377 for _, member := range srv.Members {
378 if _, ok := f.findServer(member); !ok {
379 f.addServer(member)
380 }
381 }
382
383 f.checkIfHasPrimary()
384 }
385
386 func (f *fsm) updateRSWithPrimaryFromMember(s description.Server) {
387 if f.SetName != s.SetName {
388 f.removeServerByAddr(s.Addr)
389 f.checkIfHasPrimary()
390 return
391 }
392
393 if s.Addr != s.CanonicalAddr {
394 f.removeServerByAddr(s.Addr)
395 f.checkIfHasPrimary()
396 return
397 }
398
399 f.replaceServer(s)
400
401 if _, ok := f.findPrimary(); !ok {
402 f.setKind(description.ReplicaSetNoPrimary)
403 }
404 }
405
406 func (f *fsm) updateRSWithoutPrimary(s description.Server) {
407 if f.SetName == "" {
408 f.SetName = s.SetName
409 } else if f.SetName != s.SetName {
410 f.removeServerByAddr(s.Addr)
411 return
412 }
413
414 for _, member := range s.Members {
415 if _, ok := f.findServer(member); !ok {
416 f.addServer(member)
417 }
418 }
419
420 if s.Addr != s.CanonicalAddr {
421 f.removeServerByAddr(s.Addr)
422 return
423 }
424
425 f.replaceServer(s)
426 }
427
428 func (f *fsm) updateUnknownWithStandalone(s description.Server) {
429 if len(f.Servers) > 1 {
430 f.removeServerByAddr(s.Addr)
431 return
432 }
433
434 f.setKind(description.Single)
435 f.replaceServer(s)
436 }
437
438 func (f *fsm) addServer(addr address.Address) {
439 f.Servers = append(f.Servers, description.Server{
440 Addr: addr.Canonicalize(),
441 })
442 }
443
444 func (f *fsm) findPrimary() (int, bool) {
445 for i, s := range f.Servers {
446 if s.Kind == description.RSPrimary {
447 return i, true
448 }
449 }
450
451 return 0, false
452 }
453
454 func (f *fsm) findServer(addr address.Address) (int, bool) {
455 canon := addr.Canonicalize()
456 for i, s := range f.Servers {
457 if canon == s.Addr {
458 return i, true
459 }
460 }
461
462 return 0, false
463 }
464
465 func (f *fsm) removeServer(i int) {
466 f.Servers = append(f.Servers[:i], f.Servers[i+1:]...)
467 }
468
469 func (f *fsm) removeServerByAddr(addr address.Address) {
470 if i, ok := f.findServer(addr); ok {
471 f.removeServer(i)
472 }
473 }
474
475 func (f *fsm) replaceServer(s description.Server) {
476 if i, ok := f.findServer(s.Addr); ok {
477 f.setServer(i, s)
478 }
479 }
480
481 func (f *fsm) setServer(i int, s description.Server) {
482 f.Servers[i] = s
483 }
484
485 func (f *fsm) setKind(k description.TopologyKind) {
486 f.Kind = k
487 }
488
View as plain text