...
1
18
19 package grpclb
20
21 import (
22 "fmt"
23 "sync"
24 "time"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/resolver"
28 )
29
30 const subConnCacheTime = time.Second * 10
31
32
33
34
35
36 type lbCacheClientConn struct {
37 balancer.ClientConn
38
39 timeout time.Duration
40
41 mu sync.Mutex
42
43 subConnCache map[resolver.Address]*subConnCacheEntry
44 subConnToAddr map[balancer.SubConn]resolver.Address
45 }
46
47 type subConnCacheEntry struct {
48 sc balancer.SubConn
49
50 cancel func()
51 abortDeleting bool
52 }
53
54 func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn {
55 return &lbCacheClientConn{
56 ClientConn: cc,
57 timeout: subConnCacheTime,
58 subConnCache: make(map[resolver.Address]*subConnCacheEntry),
59 subConnToAddr: make(map[balancer.SubConn]resolver.Address),
60 }
61 }
62
63 func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
64 if len(addrs) != 1 {
65 return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs))
66 }
67 addrWithoutAttrs := addrs[0]
68 addrWithoutAttrs.Attributes = nil
69
70 ccc.mu.Lock()
71 defer ccc.mu.Unlock()
72 if entry, ok := ccc.subConnCache[addrWithoutAttrs]; ok {
73
74
75 entry.cancel()
76 delete(ccc.subConnCache, addrWithoutAttrs)
77 return entry.sc, nil
78 }
79
80 scNew, err := ccc.ClientConn.NewSubConn(addrs, opts)
81 if err != nil {
82 return nil, err
83 }
84 scNew = &lbCacheSubConn{SubConn: scNew, ccc: ccc}
85
86 ccc.subConnToAddr[scNew] = addrWithoutAttrs
87 return scNew, nil
88 }
89
90 func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
91 logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
92 }
93
94 type lbCacheSubConn struct {
95 balancer.SubConn
96 ccc *lbCacheClientConn
97 }
98
99 func (sc *lbCacheSubConn) Shutdown() {
100 ccc := sc.ccc
101 ccc.mu.Lock()
102 defer ccc.mu.Unlock()
103 addr, ok := ccc.subConnToAddr[sc]
104 if !ok {
105 return
106 }
107
108 if entry, ok := ccc.subConnCache[addr]; ok {
109 if entry.sc != sc {
110
111
112
113 delete(ccc.subConnToAddr, sc)
114 sc.SubConn.Shutdown()
115 }
116 return
117 }
118
119 entry := &subConnCacheEntry{
120 sc: sc,
121 }
122 ccc.subConnCache[addr] = entry
123
124 timer := time.AfterFunc(ccc.timeout, func() {
125 ccc.mu.Lock()
126 defer ccc.mu.Unlock()
127 if entry.abortDeleting {
128 return
129 }
130 sc.SubConn.Shutdown()
131 delete(ccc.subConnToAddr, sc)
132 delete(ccc.subConnCache, addr)
133 })
134 entry.cancel = func() {
135 if !timer.Stop() {
136
137
138
139
140
141
142
143 entry.abortDeleting = true
144 }
145 }
146 }
147
148 func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
149 s.Picker = &lbCachePicker{Picker: s.Picker}
150 ccc.ClientConn.UpdateState(s)
151 }
152
153 func (ccc *lbCacheClientConn) close() {
154 ccc.mu.Lock()
155 defer ccc.mu.Unlock()
156
157 for _, entry := range ccc.subConnCache {
158 entry.cancel()
159 }
160 }
161
162 type lbCachePicker struct {
163 balancer.Picker
164 }
165
166 func (cp *lbCachePicker) Pick(i balancer.PickInfo) (balancer.PickResult, error) {
167 res, err := cp.Picker.Pick(i)
168 if err != nil {
169 return res, err
170 }
171 res.SubConn = res.SubConn.(*lbCacheSubConn).SubConn
172 return res, nil
173 }
174
View as plain text