...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package cluster
15
16 import (
17 "crypto/tls"
18 "fmt"
19 "sync"
20 "time"
21
22 lru "github.com/hashicorp/golang-lru"
23 "github.com/pkg/errors"
24 )
25
26 const capacity = 1024
27
28 type connectionPool struct {
29 mtx sync.Mutex
30 cache *lru.Cache
31 tlsConfig *tls.Config
32 }
33
34 func newConnectionPool(tlsClientCfg *tls.Config) (*connectionPool, error) {
35 cache, err := lru.NewWithEvict(
36 capacity, func(_, value interface{}) {
37 conn, ok := value.(*tlsConn)
38 if ok {
39 _ = conn.Close()
40 }
41 },
42 )
43 if err != nil {
44 return nil, errors.Wrap(err, "failed to create new LRU")
45 }
46 return &connectionPool{
47 cache: cache,
48 tlsConfig: tlsClientCfg,
49 }, nil
50 }
51
52
53
54 func (pool *connectionPool) borrowConnection(addr string, timeout time.Duration) (*tlsConn, error) {
55 pool.mtx.Lock()
56 defer pool.mtx.Unlock()
57 if pool.cache == nil {
58 return nil, errors.New("connection pool closed")
59 }
60 key := fmt.Sprintf("%s/%d", addr, int64(timeout))
61 value, exists := pool.cache.Get(key)
62 if exists {
63 conn, ok := value.(*tlsConn)
64 if ok && conn.alive() {
65 return conn, nil
66 }
67 }
68 conn, err := dialTLSConn(addr, timeout, pool.tlsConfig)
69 if err != nil {
70 return nil, err
71 }
72 pool.cache.Add(key, conn)
73 return conn, nil
74 }
75
76 func (pool *connectionPool) shutdown() {
77 pool.mtx.Lock()
78 defer pool.mtx.Unlock()
79 if pool.cache == nil {
80 return
81 }
82 pool.cache.Purge()
83 pool.cache = nil
84 }
85
View as plain text