...
1
2
3
4
5
6
7 package session
8
9 import (
10 "sync"
11 "sync/atomic"
12
13 "go.mongodb.org/mongo-driver/mongo/description"
14 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
15 )
16
17
18 type Node struct {
19 *Server
20 next *Node
21 prev *Node
22 }
23
24
25
26 type topologyDescription struct {
27 kind description.TopologyKind
28 timeoutMinutes *int64
29 }
30
31
32 type Pool struct {
33
34 checkedOut int64
35
36 descChan <-chan description.Topology
37 head *Node
38 tail *Node
39 latestTopology topologyDescription
40 mutex sync.Mutex
41 }
42
43 func (p *Pool) createServerSession() (*Server, error) {
44 s, err := newServerSession()
45 if err != nil {
46 return nil, err
47 }
48
49 atomic.AddInt64(&p.checkedOut, 1)
50 return s, nil
51 }
52
53
54 func NewPool(descChan <-chan description.Topology) *Pool {
55 p := &Pool{
56 descChan: descChan,
57 }
58
59 return p
60 }
61
62
63 func (p *Pool) updateTimeout() {
64 select {
65 case newDesc := <-p.descChan:
66 p.latestTopology = topologyDescription{
67 kind: newDesc.Kind,
68 timeoutMinutes: newDesc.SessionTimeoutMinutesPtr,
69 }
70 default:
71
72 }
73 }
74
75
76 func (p *Pool) GetSession() (*Server, error) {
77 p.mutex.Lock()
78 defer p.mutex.Unlock()
79
80
81 if p.head == nil && p.tail == nil {
82 return p.createServerSession()
83 }
84
85 p.updateTimeout()
86 for p.head != nil {
87
88 if p.head.expired(p.latestTopology) {
89 p.head = p.head.next
90 continue
91 }
92
93
94 session := p.head.Server
95 if p.head.next != nil {
96 p.head.next.prev = nil
97 }
98 if p.tail == p.head {
99 p.tail = nil
100 p.head = nil
101 } else {
102 p.head = p.head.next
103 }
104
105 atomic.AddInt64(&p.checkedOut, 1)
106 return session, nil
107 }
108
109
110 p.tail = nil
111 return p.createServerSession()
112 }
113
114
115 func (p *Pool) ReturnSession(ss *Server) {
116 if ss == nil {
117 return
118 }
119
120 p.mutex.Lock()
121 defer p.mutex.Unlock()
122
123 atomic.AddInt64(&p.checkedOut, -1)
124 p.updateTimeout()
125
126
127 for p.tail != nil && p.tail.expired(p.latestTopology) {
128 if p.tail.prev != nil {
129 p.tail.prev.next = nil
130 }
131 p.tail = p.tail.prev
132 }
133
134
135 if ss.expired(p.latestTopology) {
136 return
137 }
138
139
140 if ss.Dirty {
141 return
142 }
143
144 newNode := &Node{
145 Server: ss,
146 next: nil,
147 prev: nil,
148 }
149
150
151 if p.tail == nil {
152 p.head = newNode
153 p.tail = newNode
154 return
155 }
156
157
158 newNode.next = p.head
159 p.head.prev = newNode
160 p.head = newNode
161 }
162
163
164 func (p *Pool) IDSlice() []bsoncore.Document {
165 p.mutex.Lock()
166 defer p.mutex.Unlock()
167
168 var ids []bsoncore.Document
169 for node := p.head; node != nil; node = node.next {
170 ids = append(ids, node.SessionID)
171 }
172
173 return ids
174 }
175
176
177 func (p *Pool) String() string {
178 p.mutex.Lock()
179 defer p.mutex.Unlock()
180
181 s := ""
182 for head := p.head; head != nil; head = head.next {
183 s += head.SessionID.String() + "\n"
184 }
185
186 return s
187 }
188
189
190 func (p *Pool) CheckedOut() int64 {
191 return atomic.LoadInt64(&p.checkedOut)
192 }
193
View as plain text