1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "fmt"
19 "sort"
20 "sync"
21 "time"
22 )
23
24 type poolRouter interface {
25
26
27 poolAttach(pool *connectionPool) error
28
29
30
31 poolDetach() error
32
33
34
35 writerAttach(writer *ManagedStream) error
36
37
38
39 writerDetach(writer *ManagedStream) error
40
41
42 pickConnection(pw *pendingWrite) (*connection, error)
43 }
44
45
46
47
48
49 type simpleRouter struct {
50 mode connectionMode
51 pool *connectionPool
52
53 mu sync.RWMutex
54 conn *connection
55 writers map[string]struct{}
56 }
57
58 func (rtr *simpleRouter) poolAttach(pool *connectionPool) error {
59 if rtr.pool == nil {
60 rtr.pool = pool
61 return nil
62 }
63 return fmt.Errorf("router already attached to pool %q", rtr.pool.id)
64 }
65
66 func (rtr *simpleRouter) poolDetach() error {
67 rtr.mu.Lock()
68 defer rtr.mu.Unlock()
69 if rtr.conn != nil {
70 rtr.conn.close()
71 rtr.conn = nil
72 }
73 return nil
74 }
75
76 func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error {
77 if writer.id == "" {
78 return fmt.Errorf("writer has no ID")
79 }
80 rtr.mu.Lock()
81 defer rtr.mu.Unlock()
82 rtr.writers[writer.id] = struct{}{}
83 if rtr.conn == nil {
84 rtr.conn = newConnection(rtr.pool, rtr.mode, nil)
85 }
86 return nil
87 }
88
89 func (rtr *simpleRouter) writerDetach(writer *ManagedStream) error {
90 if writer.id == "" {
91 return fmt.Errorf("writer has no ID")
92 }
93 rtr.mu.Lock()
94 defer rtr.mu.Unlock()
95 delete(rtr.writers, writer.id)
96 if len(rtr.writers) == 0 && rtr.conn != nil {
97
98 defer rtr.conn.close()
99 rtr.conn = nil
100 }
101 return nil
102 }
103
104
105 func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {
106 rtr.mu.RLock()
107 defer rtr.mu.RUnlock()
108 if rtr.conn != nil {
109 return rtr.conn, nil
110 }
111 return nil, fmt.Errorf("no connection available")
112 }
113
114 func newSimpleRouter(mode connectionMode) *simpleRouter {
115 return &simpleRouter{
116
117 mode: mode,
118 writers: make(map[string]struct{}),
119 }
120 }
121
122
123
124
125
126
127
128
129
130
131 type sharedRouter struct {
132 pool *connectionPool
133 multiplex bool
134 maxConns int
135 close chan struct{}
136
137
138 mu sync.RWMutex
139
140 exclusiveConns map[string]*connection
141
142
143 multiMu sync.RWMutex
144
145 multiMap map[string]*connection
146
147 invertedMultiMap map[string][]*ManagedStream
148 multiConns []*connection
149 }
150
151 type connPair struct {
152 writer *ManagedStream
153 conn *connection
154 }
155
156
157
158
159 func (sr *sharedRouter) poolAttach(pool *connectionPool) error {
160 if sr.pool == nil {
161 sr.pool = pool
162 sr.close = make(chan struct{})
163 if sr.multiplex {
164 go sr.watchdog()
165 }
166 return nil
167 }
168 return fmt.Errorf("router already attached to pool %q", sr.pool.id)
169 }
170
171
172
173 func (sr *sharedRouter) poolDetach() error {
174 sr.mu.Lock()
175
176 for writerID, conn := range sr.exclusiveConns {
177 conn.close()
178 delete(sr.exclusiveConns, writerID)
179 }
180 sr.mu.Unlock()
181
182 sr.multiMu.Lock()
183 for _, co := range sr.multiConns {
184 co.close()
185 }
186 sr.multiMap = make(map[string]*connection)
187 sr.multiConns = nil
188 close(sr.close)
189 sr.multiMu.Unlock()
190 return nil
191 }
192
193 func (sr *sharedRouter) writerAttach(writer *ManagedStream) error {
194 if writer == nil {
195 return fmt.Errorf("invalid writer")
196 }
197 if writer.id == "" {
198 return fmt.Errorf("writer has empty ID")
199 }
200 if sr.multiplex && canMultiplex(writer.StreamName()) {
201 return sr.writerAttachMulti(writer)
202 }
203
204 sr.mu.Lock()
205 defer sr.mu.Unlock()
206 if pair := sr.exclusiveConns[writer.id]; pair != nil {
207 return fmt.Errorf("writer %q already attached", writer.id)
208 }
209 sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings)
210 return nil
211 }
212
213
214
215
216
217 func (sr *sharedRouter) writerAttachMulti(writer *ManagedStream) error {
218 sr.multiMu.Lock()
219 defer sr.multiMu.Unlock()
220
221 sr.orderAndGrowMultiConns()
222 conn := sr.multiConns[0]
223 sr.multiMap[writer.id] = conn
224 var writers []*ManagedStream
225 if w, ok := sr.invertedMultiMap[conn.id]; ok {
226 writers = append(w, writer)
227 } else {
228
229 writers = []*ManagedStream{writer}
230 }
231 sr.invertedMultiMap[conn.id] = writers
232 return nil
233 }
234
235
236
237
238
239 func (sr *sharedRouter) orderAndGrowMultiConns() {
240 sort.SliceStable(sr.multiConns,
241 func(i, j int) bool {
242 return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad()
243 })
244 if len(sr.multiConns) == 0 {
245 sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}
246 } else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns {
247 sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...)
248 }
249 }
250
251 var (
252
253 connLoadDeltaThreshold = 1.2
254 watchDogInterval = 500 * time.Millisecond
255 )
256
257
258
259
260
261
262
263 func (sr *sharedRouter) rebalanceWriters() {
264 mostIdleIdx := 0
265 leastIdleIdx := len(sr.multiConns) - 1
266
267 mostIdleConn := sr.multiConns[0]
268 mostIdleLoad := mostIdleConn.curLoad()
269 if mostIdleConn.isLoaded() {
270
271 return
272 }
273
274 for mostIdleIdx != leastIdleIdx {
275 targetConn := sr.multiConns[leastIdleIdx]
276 if targetConn.curLoad() < mostIdleLoad*connLoadDeltaThreshold {
277
278
279 return
280 }
281 candidates, ok := sr.invertedMultiMap[targetConn.id]
282 if !ok {
283 leastIdleIdx = leastIdleIdx - 1
284 continue
285 }
286 if len(candidates) == 1 {
287 leastIdleIdx = leastIdleIdx - 1
288 continue
289 }
290
291 candidate, remaining := candidates[0], candidates[1:]
292
293 sr.multiMap[candidate.id] = mostIdleConn
294
295 sr.invertedMultiMap[targetConn.id] = remaining
296 idleWriters, ok := sr.invertedMultiMap[mostIdleConn.id]
297 if ok {
298 sr.invertedMultiMap[mostIdleConn.id] = append(idleWriters, candidate)
299 } else {
300 sr.invertedMultiMap[mostIdleConn.id] = []*ManagedStream{candidate}
301 }
302 return
303 }
304
305 }
306
307 func (sr *sharedRouter) writerDetach(writer *ManagedStream) error {
308 if writer == nil {
309 return fmt.Errorf("invalid writer")
310 }
311 if sr.multiplex && canMultiplex(writer.StreamName()) {
312 return sr.writerDetachMulti(writer)
313 }
314
315 sr.mu.Lock()
316 defer sr.mu.Unlock()
317 conn := sr.exclusiveConns[writer.id]
318 if conn == nil {
319 return fmt.Errorf("writer not currently attached")
320 }
321 conn.close()
322 delete(sr.exclusiveConns, writer.id)
323 return nil
324 }
325
326
327
328 func (sr *sharedRouter) writerDetachMulti(writer *ManagedStream) error {
329 sr.multiMu.Lock()
330 defer sr.multiMu.Unlock()
331 delete(sr.multiMap, writer.id)
332
333 if len(sr.multiMap) == 0 {
334 for _, co := range sr.multiConns {
335 co.close()
336 }
337 sr.multiConns = nil
338 }
339 return nil
340 }
341
342
343
344 func (sr *sharedRouter) pickConnection(pw *pendingWrite) (*connection, error) {
345 if pw.writer == nil {
346 return nil, fmt.Errorf("no writer present pending write")
347 }
348 if sr.multiplex && canMultiplex(pw.writer.StreamName()) {
349 return sr.pickMultiplexConnection(pw)
350 }
351 sr.mu.RLock()
352 defer sr.mu.RUnlock()
353 conn := sr.exclusiveConns[pw.writer.id]
354 if conn == nil {
355 return nil, fmt.Errorf("writer %q unknown", pw.writer.id)
356 }
357 return conn, nil
358 }
359
360 func (sr *sharedRouter) pickMultiplexConnection(pw *pendingWrite) (*connection, error) {
361 sr.multiMu.RLock()
362 defer sr.multiMu.RUnlock()
363 conn := sr.multiMap[pw.writer.id]
364 if conn == nil {
365
366 return nil, fmt.Errorf("no multiplex connection assigned")
367 }
368 return conn, nil
369 }
370
371
372
373
374
375
376
377
378
379
380 func (sr *sharedRouter) watchdog() {
381 for {
382 select {
383 case <-sr.close:
384 return
385 case <-time.After(watchDogInterval):
386 sr.watchdogPulse()
387 }
388 }
389 }
390
391
392 func (sr *sharedRouter) watchdogPulse() {
393 sr.multiMu.Lock()
394 defer sr.multiMu.Unlock()
395 sr.orderAndGrowMultiConns()
396 sr.rebalanceWriters()
397 }
398
399 func newSharedRouter(multiplex bool, maxConns int) *sharedRouter {
400 return &sharedRouter{
401 multiplex: multiplex,
402 maxConns: maxConns,
403 exclusiveConns: make(map[string]*connection),
404 multiMap: make(map[string]*connection),
405 invertedMultiMap: make(map[string][]*ManagedStream),
406 }
407 }
408
View as plain text