...
1
2
3
4
5
6
7 package topology
8
9 import (
10 "sync"
11 "sync/atomic"
12
13 "go.mongodb.org/mongo-driver/bson/primitive"
14 )
15
16
17 const (
18 generationDisconnected int64 = iota
19 generationConnected
20 )
21
22
23
24 type generationStats struct {
25 generation uint64
26 numConns uint64
27 }
28
29
30
31
32 type poolGenerationMap struct {
33
34
35
36 state int64
37 generationMap map[primitive.ObjectID]*generationStats
38
39 sync.Mutex
40 }
41
42 func newPoolGenerationMap() *poolGenerationMap {
43 pgm := &poolGenerationMap{
44 generationMap: make(map[primitive.ObjectID]*generationStats),
45 }
46 pgm.generationMap[primitive.NilObjectID] = &generationStats{}
47 return pgm
48 }
49
50 func (p *poolGenerationMap) connect() {
51 atomic.StoreInt64(&p.state, generationConnected)
52 }
53
54 func (p *poolGenerationMap) disconnect() {
55 atomic.StoreInt64(&p.state, generationDisconnected)
56 }
57
58
59
60 func (p *poolGenerationMap) addConnection(serviceIDPtr *primitive.ObjectID) uint64 {
61 serviceID := getServiceID(serviceIDPtr)
62 p.Lock()
63 defer p.Unlock()
64
65 stats, ok := p.generationMap[serviceID]
66 if ok {
67
68 stats.numConns++
69 return stats.generation
70 }
71
72
73 stats = &generationStats{
74 numConns: 1,
75 }
76 p.generationMap[serviceID] = stats
77 return 0
78 }
79
80 func (p *poolGenerationMap) removeConnection(serviceIDPtr *primitive.ObjectID) {
81 serviceID := getServiceID(serviceIDPtr)
82 p.Lock()
83 defer p.Unlock()
84
85 stats, ok := p.generationMap[serviceID]
86 if !ok {
87 return
88 }
89
90
91
92
93 stats.numConns--
94 if stats.numConns == 0 {
95 delete(p.generationMap, serviceID)
96 }
97 }
98
99 func (p *poolGenerationMap) clear(serviceIDPtr *primitive.ObjectID) {
100 serviceID := getServiceID(serviceIDPtr)
101 p.Lock()
102 defer p.Unlock()
103
104 if stats, ok := p.generationMap[serviceID]; ok {
105 stats.generation++
106 }
107 }
108
109 func (p *poolGenerationMap) stale(serviceIDPtr *primitive.ObjectID, knownGeneration uint64) bool {
110
111 if atomic.LoadInt64(&p.state) == generationDisconnected {
112 return true
113 }
114
115 if generation, ok := p.getGeneration(serviceIDPtr); ok {
116 return knownGeneration < generation
117 }
118 return false
119 }
120
121 func (p *poolGenerationMap) getGeneration(serviceIDPtr *primitive.ObjectID) (uint64, bool) {
122 serviceID := getServiceID(serviceIDPtr)
123 p.Lock()
124 defer p.Unlock()
125
126 if stats, ok := p.generationMap[serviceID]; ok {
127 return stats.generation, true
128 }
129 return 0, false
130 }
131
132 func (p *poolGenerationMap) getNumConns(serviceIDPtr *primitive.ObjectID) uint64 {
133 serviceID := getServiceID(serviceIDPtr)
134 p.Lock()
135 defer p.Unlock()
136
137 if stats, ok := p.generationMap[serviceID]; ok {
138 return stats.numConns
139 }
140 return 0
141 }
142
143 func getServiceID(oid *primitive.ObjectID) primitive.ObjectID {
144 if oid == nil {
145 return primitive.NilObjectID
146 }
147 return *oid
148 }
149
View as plain text