1
2
3
4
5
6
7 package topology
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "net"
14 "sync"
15 "sync/atomic"
16 "time"
17
18 "go.mongodb.org/mongo-driver/bson"
19 "go.mongodb.org/mongo-driver/bson/primitive"
20 "go.mongodb.org/mongo-driver/event"
21 "go.mongodb.org/mongo-driver/internal/driverutil"
22 "go.mongodb.org/mongo-driver/internal/logger"
23 "go.mongodb.org/mongo-driver/mongo/address"
24 "go.mongodb.org/mongo-driver/mongo/description"
25 "go.mongodb.org/mongo-driver/x/mongo/driver"
26 "go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
27 "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
28 )
29
30 const minHeartbeatInterval = 500 * time.Millisecond
31 const wireVersion42 = 8
32
33
34 const (
35 serverDisconnected int64 = iota
36 serverDisconnecting
37 serverConnected
38 )
39
40 func serverStateString(state int64) string {
41 switch state {
42 case serverDisconnected:
43 return "Disconnected"
44 case serverDisconnecting:
45 return "Disconnecting"
46 case serverConnected:
47 return "Connected"
48 }
49
50 return ""
51 }
52
53 var (
54
55
56 ErrServerClosed = errors.New("server is closed")
57
58
59 ErrServerConnected = errors.New("server is connected")
60
61 errCheckCancelled = errors.New("server check cancelled")
62 emptyDescription = description.NewDefaultServer("")
63 )
64
65
66
67 type SelectedServer struct {
68 *Server
69
70 Kind description.TopologyKind
71 }
72
73
74 func (ss *SelectedServer) Description() description.SelectedServer {
75 sdesc := ss.Server.Description()
76 return description.SelectedServer{
77 Server: sdesc,
78 Kind: ss.Kind,
79 }
80 }
81
82
83 type Server struct {
84
85
86
87
88
89 state int64
90 operationCount int64
91
92 cfg *serverConfig
93 address address.Address
94
95
96 pool *pool
97
98
99 done chan struct{}
100 checkNow chan struct{}
101 disconnecting chan struct{}
102 closewg sync.WaitGroup
103
104
105 desc atomic.Value
106 updateTopologyCallback atomic.Value
107 topologyID primitive.ObjectID
108
109
110 subLock sync.Mutex
111 subscribers map[uint64]chan description.Server
112 currentSubscriberID uint64
113 subscriptionsClosed bool
114
115
116
117
118
119 heartbeatLock sync.Mutex
120 conn *connection
121 globalCtx context.Context
122 globalCtxCancel context.CancelFunc
123 heartbeatCtx context.Context
124 heartbeatCtxCancel context.CancelFunc
125
126 processErrorLock sync.Mutex
127 rttMonitor *rttMonitor
128 }
129
130
131
132
133 type updateTopologyCallback func(description.Server) description.Server
134
135
136
137 func ConnectServer(
138 addr address.Address,
139 updateCallback updateTopologyCallback,
140 topologyID primitive.ObjectID,
141 opts ...ServerOption,
142 ) (*Server, error) {
143 srvr := NewServer(addr, topologyID, opts...)
144 err := srvr.Connect(updateCallback)
145 if err != nil {
146 return nil, err
147 }
148 return srvr, nil
149 }
150
151
152
153 func NewServer(addr address.Address, topologyID primitive.ObjectID, opts ...ServerOption) *Server {
154 cfg := newServerConfig(opts...)
155 globalCtx, globalCtxCancel := context.WithCancel(context.Background())
156 s := &Server{
157 state: serverDisconnected,
158
159 cfg: cfg,
160 address: addr,
161
162 done: make(chan struct{}),
163 checkNow: make(chan struct{}, 1),
164 disconnecting: make(chan struct{}),
165
166 topologyID: topologyID,
167
168 subscribers: make(map[uint64]chan description.Server),
169 globalCtx: globalCtx,
170 globalCtxCancel: globalCtxCancel,
171 }
172 s.desc.Store(description.NewDefaultServer(addr))
173 rttCfg := &rttConfig{
174 interval: cfg.heartbeatInterval,
175 minRTTWindow: 5 * time.Minute,
176 createConnectionFn: s.createConnection,
177 createOperationFn: s.createBaseOperation,
178 }
179 s.rttMonitor = newRTTMonitor(rttCfg)
180
181 pc := poolConfig{
182 Address: addr,
183 MinPoolSize: cfg.minConns,
184 MaxPoolSize: cfg.maxConns,
185 MaxConnecting: cfg.maxConnecting,
186 MaxIdleTime: cfg.poolMaxIdleTime,
187 MaintainInterval: cfg.poolMaintainInterval,
188 LoadBalanced: cfg.loadBalanced,
189 PoolMonitor: cfg.poolMonitor,
190 Logger: cfg.logger,
191 handshakeErrFn: s.ProcessHandshakeError,
192 }
193
194 connectionOpts := copyConnectionOpts(cfg.connectionOpts)
195 s.pool = newPool(pc, connectionOpts...)
196 s.publishServerOpeningEvent(s.address)
197
198 return s
199 }
200
201 func mustLogServerMessage(srv *Server) bool {
202 return srv.cfg.logger != nil && srv.cfg.logger.LevelComponentEnabled(
203 logger.LevelDebug, logger.ComponentTopology)
204 }
205
206 func logServerMessage(srv *Server, msg string, keysAndValues ...interface{}) {
207 serverHost, serverPort, err := net.SplitHostPort(srv.address.String())
208 if err != nil {
209 serverHost = srv.address.String()
210 serverPort = ""
211 }
212
213 var driverConnectionID uint64
214 var serverConnectionID *int64
215
216 if srv.conn != nil {
217 driverConnectionID = srv.conn.driverConnectionID
218 serverConnectionID = srv.conn.serverConnectionID
219 }
220
221 srv.cfg.logger.Print(logger.LevelDebug,
222 logger.ComponentTopology,
223 msg,
224 logger.SerializeServer(logger.Server{
225 DriverConnectionID: driverConnectionID,
226 TopologyID: srv.topologyID,
227 Message: msg,
228 ServerConnectionID: serverConnectionID,
229 ServerHost: serverHost,
230 ServerPort: serverPort,
231 }, keysAndValues...)...)
232 }
233
234
235
236 func (s *Server) Connect(updateCallback updateTopologyCallback) error {
237 if !atomic.CompareAndSwapInt64(&s.state, serverDisconnected, serverConnected) {
238 return ErrServerConnected
239 }
240
241 desc := description.NewDefaultServer(s.address)
242 if s.cfg.loadBalanced {
243
244 desc.Kind = description.LoadBalancer
245 }
246 s.desc.Store(desc)
247 s.updateTopologyCallback.Store(updateCallback)
248
249 if !s.cfg.monitoringDisabled && !s.cfg.loadBalanced {
250 s.closewg.Add(1)
251 go s.update()
252 }
253
254
255
256
257
258
259
260
261 return s.pool.ready()
262 }
263
264
265
266
267
268
269
270
271
272
273 func (s *Server) Disconnect(ctx context.Context) error {
274 if !atomic.CompareAndSwapInt64(&s.state, serverConnected, serverDisconnecting) {
275 return ErrServerClosed
276 }
277
278 s.updateTopologyCallback.Store((updateTopologyCallback)(nil))
279
280
281
282
283
284 s.globalCtxCancel()
285 close(s.done)
286 s.cancelCheck()
287
288 s.rttMonitor.disconnect()
289 s.pool.close(ctx)
290
291 s.closewg.Wait()
292 atomic.StoreInt64(&s.state, serverDisconnected)
293
294 return nil
295 }
296
297
298 func (s *Server) Connection(ctx context.Context) (driver.Connection, error) {
299 if atomic.LoadInt64(&s.state) != serverConnected {
300 return nil, ErrServerClosed
301 }
302
303
304
305
306 atomic.AddInt64(&s.operationCount, 1)
307 conn, err := s.pool.checkOut(ctx)
308 if err != nil {
309 atomic.AddInt64(&s.operationCount, -1)
310 return nil, err
311 }
312
313 return &Connection{
314 connection: conn,
315 cleanupServerFn: func() {
316
317
318
319
320
321
322 atomic.AddInt64(&s.operationCount, -1)
323 },
324 }, nil
325 }
326
327
328
329 func (s *Server) ProcessHandshakeError(err error, startingGenerationNumber uint64, serviceID *primitive.ObjectID) {
330
331
332
333 if err == nil || s.cfg.loadBalanced && serviceID == nil {
334 return
335 }
336
337 if generation, _ := s.pool.generation.getGeneration(serviceID); startingGenerationNumber < generation {
338 return
339 }
340
341
342
343 wrappedConnErr := unwrapConnectionError(err)
344 if wrappedConnErr == nil {
345 return
346 }
347
348
349
350
351 s.processErrorLock.Lock()
352 defer s.processErrorLock.Unlock()
353
354
355
356
357 s.updateDescription(description.NewServerFromError(s.address, wrappedConnErr, nil))
358 s.pool.clear(err, serviceID)
359 s.cancelCheck()
360 }
361
362
363 func (s *Server) Description() description.Server {
364 return s.desc.Load().(description.Server)
365 }
366
367
368
369
370 func (s *Server) SelectedDescription() description.SelectedServer {
371 sdesc := s.Description()
372 return description.SelectedServer{
373 Server: sdesc,
374 Kind: description.Single,
375 }
376 }
377
378
379
380
381 func (s *Server) Subscribe() (*ServerSubscription, error) {
382 if atomic.LoadInt64(&s.state) != serverConnected {
383 return nil, ErrSubscribeAfterClosed
384 }
385 ch := make(chan description.Server, 1)
386 ch <- s.desc.Load().(description.Server)
387
388 s.subLock.Lock()
389 defer s.subLock.Unlock()
390 if s.subscriptionsClosed {
391 return nil, ErrSubscribeAfterClosed
392 }
393 id := s.currentSubscriberID
394 s.subscribers[id] = ch
395 s.currentSubscriberID++
396
397 ss := &ServerSubscription{
398 C: ch,
399 s: s,
400 id: id,
401 }
402
403 return ss, nil
404 }
405
406
407
408 func (s *Server) RequestImmediateCheck() {
409 select {
410 case s.checkNow <- struct{}{}:
411 default:
412 }
413 }
414
415
416
417
418 func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bool) {
419 var writeCmdErr driver.WriteCommandError
420 if !errors.As(err, &writeCmdErr) {
421 return nil, false
422 }
423
424 wcerr := writeCmdErr.WriteConcernError
425 if wcerr != nil && (wcerr.NodeIsRecovering() || wcerr.NotPrimary()) {
426 return wcerr, true
427 }
428 return nil, false
429 }
430
431
432 func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
433
434 if err == nil {
435 return driver.NoChange
436 }
437
438
439
440
441
442
443 if conn.Stale() {
444 return driver.NoChange
445 }
446
447
448
449
450 s.processErrorLock.Lock()
451 defer s.processErrorLock.Unlock()
452
453
454
455
456 connDesc := conn.Description()
457 wireVersion := connDesc.WireVersion
458 serviceID := connDesc.ServiceID
459
460
461
462 serverDesc := s.desc.Load().(description.Server)
463 topologyVersion := serverDesc.TopologyVersion
464
465
466
467
468
469
470
471
472
473
474
475
476 if tv := connDesc.TopologyVersion; tv != nil && topologyVersion.CompareToIncoming(tv) < 0 {
477 topologyVersion = tv
478 }
479
480
481
482 if cerr, ok := err.(driver.Error); ok && (cerr.NodeIsRecovering() || cerr.NotPrimary()) {
483
484 if topologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
485 return driver.NoChange
486 }
487
488
489 s.updateDescription(description.NewServerFromError(s.address, err, cerr.TopologyVersion))
490 s.RequestImmediateCheck()
491
492 res := driver.ServerMarkedUnknown
493
494 if cerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
495 res = driver.ConnectionPoolCleared
496 s.pool.clear(err, serviceID)
497 }
498
499 return res
500 }
501 if wcerr, ok := getWriteConcernErrorForProcessing(err); ok {
502
503 if topologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
504 return driver.NoChange
505 }
506
507
508 s.updateDescription(description.NewServerFromError(s.address, err, wcerr.TopologyVersion))
509 s.RequestImmediateCheck()
510
511 res := driver.ServerMarkedUnknown
512
513 if wcerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
514 res = driver.ConnectionPoolCleared
515 s.pool.clear(err, serviceID)
516 }
517 return res
518 }
519
520 wrappedConnErr := unwrapConnectionError(err)
521 if wrappedConnErr == nil {
522 return driver.NoChange
523 }
524
525
526 if netErr, ok := wrappedConnErr.(net.Error); ok && netErr.Timeout() {
527 return driver.NoChange
528 }
529 if errors.Is(wrappedConnErr, context.Canceled) || errors.Is(wrappedConnErr, context.DeadlineExceeded) {
530 return driver.NoChange
531 }
532
533
534
535
536 s.updateDescription(description.NewServerFromError(s.address, err, nil))
537 s.pool.clear(err, serviceID)
538 s.cancelCheck()
539 return driver.ConnectionPoolCleared
540 }
541
542
543
544 func (s *Server) update() {
545 defer s.closewg.Done()
546 heartbeatTicker := time.NewTicker(s.cfg.heartbeatInterval)
547 rateLimiter := time.NewTicker(minHeartbeatInterval)
548 defer heartbeatTicker.Stop()
549 defer rateLimiter.Stop()
550 checkNow := s.checkNow
551 done := s.done
552
553 defer logUnexpectedFailure(s.cfg.logger, "Encountered unexpected failure updating server")
554
555 closeServer := func() {
556 s.subLock.Lock()
557 for id, c := range s.subscribers {
558 close(c)
559 delete(s.subscribers, id)
560 }
561 s.subscriptionsClosed = true
562 s.subLock.Unlock()
563
564
565
566 if s.conn != nil {
567 _ = s.conn.close()
568 }
569 }
570
571 waitUntilNextCheck := func() {
572
573
574 select {
575 case <-heartbeatTicker.C:
576 case <-checkNow:
577 case <-done:
578
579 return
580 }
581
582
583 select {
584 case <-rateLimiter.C:
585 case <-done:
586 return
587 }
588 }
589
590 timeoutCnt := 0
591 for {
592
593
594 select {
595 case <-done:
596 closeServer()
597 return
598 default:
599 }
600
601 previousDescription := s.Description()
602
603
604 desc, err := s.check()
605 if errors.Is(err, errCheckCancelled) {
606 if atomic.LoadInt64(&s.state) != serverConnected {
607 continue
608 }
609
610
611
612 waitUntilNextCheck()
613 continue
614 }
615
616 if isShortcut := func() bool {
617
618
619
620 s.processErrorLock.Lock()
621 defer s.processErrorLock.Unlock()
622
623 s.updateDescription(desc)
624
625
626 if err := unwrapConnectionError(desc.LastError); err != nil && timeoutCnt < 1 {
627 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
628 timeoutCnt++
629
630 return true
631 }
632 if err, ok := err.(net.Error); ok && err.Timeout() {
633 timeoutCnt++
634
635 return true
636 }
637 }
638 if err := desc.LastError; err != nil {
639
640
641
642 if timeoutCnt > 0 {
643 s.pool.clearAll(err, nil)
644 } else {
645 s.pool.clear(err, nil)
646 }
647 }
648
649
650
651 timeoutCnt = 0
652 return false
653 }(); isShortcut {
654 continue
655 }
656
657
658
659
660 connectionIsStreaming := s.conn != nil && s.conn.getCurrentlyStreaming()
661 transitionedFromNetworkError := desc.LastError != nil && unwrapConnectionError(desc.LastError) != nil &&
662 previousDescription.Kind != description.Unknown
663
664 if isStreamingEnabled(s) && isStreamable(s) && !s.rttMonitor.started {
665 s.rttMonitor.connect()
666 }
667
668 if isStreamable(s) || connectionIsStreaming || transitionedFromNetworkError {
669 continue
670 }
671
672
673
674 waitUntilNextCheck()
675 }
676 }
677
678
679
680
681
682 func (s *Server) updateDescription(desc description.Server) {
683 if s.cfg.loadBalanced {
684
685
686 return
687 }
688
689 defer logUnexpectedFailure(s.cfg.logger, "Encountered unexpected failure updating server description")
690
691
692
693
694
695
696
697
698
699 if desc.Kind != description.Unknown {
700 _ = s.pool.ready()
701 }
702
703
704 callback, ok := s.updateTopologyCallback.Load().(updateTopologyCallback)
705 if ok && callback != nil {
706 desc = callback(desc)
707 }
708 s.desc.Store(desc)
709
710 s.subLock.Lock()
711 for _, c := range s.subscribers {
712 select {
713
714 case <-c:
715 default:
716 }
717 c <- desc
718 }
719 s.subLock.Unlock()
720 }
721
722
723
724 func (s *Server) createConnection() *connection {
725 opts := copyConnectionOpts(s.cfg.connectionOpts)
726 opts = append(opts,
727 WithConnectTimeout(func(time.Duration) time.Duration { return s.cfg.heartbeatTimeout }),
728 WithReadTimeout(func(time.Duration) time.Duration { return s.cfg.heartbeatTimeout }),
729 WithWriteTimeout(func(time.Duration) time.Duration { return s.cfg.heartbeatTimeout }),
730
731
732 WithHandshaker(func(h Handshaker) Handshaker {
733 return operation.NewHello().AppName(s.cfg.appname).Compressors(s.cfg.compressionOpts).
734 ServerAPI(s.cfg.serverAPI)
735 }),
736
737 WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor { return nil }),
738 )
739
740 return newConnection(s.address, opts...)
741 }
742
743 func copyConnectionOpts(opts []ConnectionOption) []ConnectionOption {
744 optsCopy := make([]ConnectionOption, len(opts))
745 copy(optsCopy, opts)
746 return optsCopy
747 }
748
749 func (s *Server) setupHeartbeatConnection() error {
750 conn := s.createConnection()
751
752
753 s.heartbeatLock.Lock()
754 if s.heartbeatCtxCancel != nil {
755
756 s.heartbeatCtxCancel()
757 }
758 s.heartbeatCtx, s.heartbeatCtxCancel = context.WithCancel(s.globalCtx)
759 s.conn = conn
760 s.heartbeatLock.Unlock()
761
762 return s.conn.connect(s.heartbeatCtx)
763 }
764
765
766 func (s *Server) cancelCheck() {
767 var conn *connection
768
769
770 s.heartbeatLock.Lock()
771 if s.heartbeatCtx != nil {
772 s.heartbeatCtxCancel()
773 }
774 conn = s.conn
775 s.heartbeatLock.Unlock()
776
777 if conn == nil {
778 return
779 }
780
781
782
783
784 conn.closeConnectContext()
785 conn.wait()
786 _ = conn.close()
787 }
788
789 func (s *Server) checkWasCancelled() bool {
790 return s.heartbeatCtx.Err() != nil
791 }
792
793 func (s *Server) createBaseOperation(conn driver.Connection) *operation.Hello {
794 return operation.
795 NewHello().
796 ClusterClock(s.cfg.clock).
797 Deployment(driver.SingleConnectionDeployment{C: conn}).
798 ServerAPI(s.cfg.serverAPI)
799 }
800
801 func isStreamingEnabled(srv *Server) bool {
802 switch srv.cfg.serverMonitoringMode {
803 case connstring.ServerMonitoringModeStream:
804 return true
805 case connstring.ServerMonitoringModePoll:
806 return false
807 default:
808 return driverutil.GetFaasEnvName() == ""
809 }
810 }
811
812 func isStreamable(srv *Server) bool {
813 return srv.Description().Kind != description.Unknown && srv.Description().TopologyVersion != nil
814 }
815
816 func (s *Server) check() (description.Server, error) {
817 var descPtr *description.Server
818 var err error
819 var duration time.Duration
820
821 start := time.Now()
822
823
824
825 if s.conn == nil || s.conn.closed() || s.checkWasCancelled() {
826 connID := "0"
827 if s.conn != nil {
828 connID = s.conn.ID()
829 }
830 s.publishServerHeartbeatStartedEvent(connID, false)
831
832 err = s.setupHeartbeatConnection()
833 duration = time.Since(start)
834 connID = "0"
835 if s.conn != nil {
836 connID = s.conn.ID()
837 }
838 if err == nil {
839
840 s.rttMonitor.addSample(s.conn.helloRTT)
841 descPtr = &s.conn.desc
842 s.publishServerHeartbeatSucceededEvent(connID, duration, s.conn.desc, false)
843 } else {
844 err = unwrapConnectionError(err)
845 s.publishServerHeartbeatFailedEvent(connID, duration, err, false)
846 }
847 } else {
848
849
850
851 heartbeatConn := initConnection{s.conn}
852 baseOperation := s.createBaseOperation(heartbeatConn)
853 previousDescription := s.Description()
854 streamable := isStreamingEnabled(s) && isStreamable(s)
855
856 s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable)
857
858 switch {
859 case s.conn.getCurrentlyStreaming():
860
861 err = baseOperation.StreamResponse(s.heartbeatCtx, heartbeatConn)
862 case streamable:
863
864
865
866
867
868 maxAwaitTimeMS := int64(s.cfg.heartbeatInterval) / 1e6
869
870
871
872 socketTimeout := s.cfg.heartbeatTimeout
873 if socketTimeout != 0 {
874 socketTimeout += s.cfg.heartbeatInterval
875 }
876 s.conn.setSocketTimeout(socketTimeout)
877 baseOperation = baseOperation.TopologyVersion(previousDescription.TopologyVersion).
878 MaxAwaitTimeMS(maxAwaitTimeMS)
879 s.conn.setCanStream(true)
880 err = baseOperation.Execute(s.heartbeatCtx)
881 default:
882
883
884
885 s.conn.setSocketTimeout(s.cfg.heartbeatTimeout)
886 err = baseOperation.Execute(s.heartbeatCtx)
887 }
888
889 duration = time.Since(start)
890
891
892
893
894 if !streamable {
895 s.rttMonitor.addSample(duration)
896 }
897
898 if err == nil {
899 tempDesc := baseOperation.Result(s.address)
900 descPtr = &tempDesc
901 s.publishServerHeartbeatSucceededEvent(s.conn.ID(), duration, tempDesc, s.conn.getCurrentlyStreaming() || streamable)
902 } else {
903
904
905 if s.conn != nil {
906 _ = s.conn.close()
907 }
908 s.publishServerHeartbeatFailedEvent(s.conn.ID(), duration, err, s.conn.getCurrentlyStreaming() || streamable)
909 }
910 }
911
912 if descPtr != nil {
913
914 desc := *descPtr
915 desc = desc.SetAverageRTT(s.rttMonitor.EWMA())
916 desc.HeartbeatInterval = s.cfg.heartbeatInterval
917 return desc, nil
918 }
919
920 if s.checkWasCancelled() {
921
922
923 return emptyDescription, errCheckCancelled
924 }
925
926
927
928 topologyVersion := extractTopologyVersion(err)
929 s.rttMonitor.reset()
930 return description.NewServerFromError(s.address, err, topologyVersion), nil
931 }
932
933 func extractTopologyVersion(err error) *description.TopologyVersion {
934 if ce, ok := err.(ConnectionError); ok {
935 err = ce.Wrapped
936 }
937
938 switch converted := err.(type) {
939 case driver.Error:
940 return converted.TopologyVersion
941 case driver.WriteCommandError:
942 if converted.WriteConcernError != nil {
943 return converted.WriteConcernError.TopologyVersion
944 }
945 }
946
947 return nil
948 }
949
950
951 func (s *Server) RTTMonitor() driver.RTTMonitor {
952 return s.rttMonitor
953 }
954
955
956 func (s *Server) OperationCount() int64 {
957 return atomic.LoadInt64(&s.operationCount)
958 }
959
960
961 func (s *Server) String() string {
962 desc := s.Description()
963 state := atomic.LoadInt64(&s.state)
964 str := fmt.Sprintf("Addr: %s, Type: %s, State: %s",
965 s.address, desc.Kind, serverStateString(state))
966 if len(desc.Tags) != 0 {
967 str += fmt.Sprintf(", Tag sets: %s", desc.Tags)
968 }
969 if state == serverConnected {
970 str += fmt.Sprintf(", Average RTT: %s, Min RTT: %s", desc.AverageRTT, s.RTTMonitor().Min())
971 }
972 if desc.LastError != nil {
973 str += fmt.Sprintf(", Last error: %s", desc.LastError)
974 }
975
976 return str
977 }
978
979
980
981 type ServerSubscription struct {
982 C <-chan description.Server
983 s *Server
984 id uint64
985 }
986
987
988
989 func (ss *ServerSubscription) Unsubscribe() error {
990 ss.s.subLock.Lock()
991 defer ss.s.subLock.Unlock()
992 if ss.s.subscriptionsClosed {
993 return nil
994 }
995
996 ch, ok := ss.s.subscribers[ss.id]
997 if !ok {
998 return nil
999 }
1000
1001 close(ch)
1002 delete(ss.s.subscribers, ss.id)
1003
1004 return nil
1005 }
1006
1007
1008 func (s *Server) publishServerOpeningEvent(addr address.Address) {
1009 if s == nil {
1010 return
1011 }
1012
1013 serverOpening := &event.ServerOpeningEvent{
1014 Address: addr,
1015 TopologyID: s.topologyID,
1016 }
1017
1018 if s.cfg.serverMonitor != nil && s.cfg.serverMonitor.ServerOpening != nil {
1019 s.cfg.serverMonitor.ServerOpening(serverOpening)
1020 }
1021
1022 if mustLogServerMessage(s) {
1023 logServerMessage(s, logger.TopologyServerOpening)
1024 }
1025 }
1026
1027
1028 func (s *Server) publishServerHeartbeatStartedEvent(connectionID string, await bool) {
1029 serverHeartbeatStarted := &event.ServerHeartbeatStartedEvent{
1030 ConnectionID: connectionID,
1031 Awaited: await,
1032 }
1033
1034 if s != nil && s.cfg.serverMonitor != nil && s.cfg.serverMonitor.ServerHeartbeatStarted != nil {
1035 s.cfg.serverMonitor.ServerHeartbeatStarted(serverHeartbeatStarted)
1036 }
1037
1038 if mustLogServerMessage(s) {
1039 logServerMessage(s, logger.TopologyServerHeartbeatStarted,
1040 logger.KeyAwaited, await)
1041 }
1042 }
1043
1044
1045 func (s *Server) publishServerHeartbeatSucceededEvent(connectionID string,
1046 duration time.Duration,
1047 desc description.Server,
1048 await bool,
1049 ) {
1050 serverHeartbeatSucceeded := &event.ServerHeartbeatSucceededEvent{
1051 DurationNanos: duration.Nanoseconds(),
1052 Duration: duration,
1053 Reply: desc,
1054 ConnectionID: connectionID,
1055 Awaited: await,
1056 }
1057
1058 if s != nil && s.cfg.serverMonitor != nil && s.cfg.serverMonitor.ServerHeartbeatSucceeded != nil {
1059 s.cfg.serverMonitor.ServerHeartbeatSucceeded(serverHeartbeatSucceeded)
1060 }
1061
1062 if mustLogServerMessage(s) {
1063 descRaw, _ := bson.Marshal(struct {
1064 description.Server `bson:",inline"`
1065 Ok int32
1066 }{
1067 Server: desc,
1068 Ok: func() int32 {
1069 if desc.LastError != nil {
1070 return 0
1071 }
1072
1073 return 1
1074 }(),
1075 })
1076
1077 logServerMessage(s, logger.TopologyServerHeartbeatSucceeded,
1078 logger.KeyAwaited, await,
1079 logger.KeyDurationMS, duration.Milliseconds(),
1080 logger.KeyReply, bson.Raw(descRaw).String())
1081 }
1082 }
1083
1084
1085 func (s *Server) publishServerHeartbeatFailedEvent(connectionID string,
1086 duration time.Duration,
1087 err error,
1088 await bool,
1089 ) {
1090 serverHeartbeatFailed := &event.ServerHeartbeatFailedEvent{
1091 DurationNanos: duration.Nanoseconds(),
1092 Duration: duration,
1093 Failure: err,
1094 ConnectionID: connectionID,
1095 Awaited: await,
1096 }
1097
1098 if s != nil && s.cfg.serverMonitor != nil && s.cfg.serverMonitor.ServerHeartbeatFailed != nil {
1099 s.cfg.serverMonitor.ServerHeartbeatFailed(serverHeartbeatFailed)
1100 }
1101
1102 if mustLogServerMessage(s) {
1103 logServerMessage(s, logger.TopologyServerHeartbeatFailed,
1104 logger.KeyAwaited, await,
1105 logger.KeyDurationMS, duration.Milliseconds(),
1106 logger.KeyFailure, err.Error())
1107 }
1108 }
1109
1110
1111 func unwrapConnectionError(err error) error {
1112
1113
1114
1115 connErr, ok := err.(ConnectionError)
1116 if ok {
1117 return connErr.Wrapped
1118 }
1119
1120 driverErr, ok := err.(driver.Error)
1121 if !ok || !driverErr.NetworkError() {
1122 return nil
1123 }
1124
1125 connErr, ok = driverErr.Wrapped.(ConnectionError)
1126 if ok {
1127 return connErr.Wrapped
1128 }
1129
1130 return nil
1131 }
1132
View as plain text