1
18
19 package grpclb
20
21 import (
22 "fmt"
23 "sync"
24 "testing"
25 "time"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/resolver"
29 )
30
31 type mockSubConn struct {
32 balancer.SubConn
33 mcc *mockClientConn
34 }
35
36 func (msc *mockSubConn) Shutdown() {
37 msc.mcc.mu.Lock()
38 defer msc.mcc.mu.Unlock()
39 delete(msc.mcc.subConns, msc)
40 }
41
42 type mockClientConn struct {
43 balancer.ClientConn
44
45 mu sync.Mutex
46 subConns map[balancer.SubConn]resolver.Address
47 }
48
49 func newMockClientConn() *mockClientConn {
50 return &mockClientConn{
51 subConns: make(map[balancer.SubConn]resolver.Address),
52 }
53 }
54
55 func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
56 sc := &mockSubConn{mcc: mcc}
57 mcc.mu.Lock()
58 defer mcc.mu.Unlock()
59 mcc.subConns[sc] = addrs[0]
60 return sc, nil
61 }
62
63 func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
64 panic(fmt.Sprintf("RemoveSubConn(%v) called unexpectedly", sc))
65 }
66
67 const testCacheTimeout = 100 * time.Millisecond
68
69 func checkMockCC(mcc *mockClientConn, scLen int) error {
70 mcc.mu.Lock()
71 defer mcc.mu.Unlock()
72 if len(mcc.subConns) != scLen {
73 return fmt.Errorf("mcc = %+v, want len(mcc.subConns) = %v", mcc.subConns, scLen)
74 }
75 return nil
76 }
77
78 func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error {
79 ccc.mu.Lock()
80 defer ccc.mu.Unlock()
81 if len(ccc.subConnCache) != sccLen {
82 return fmt.Errorf("ccc = %+v, want len(ccc.subConnCache) = %v", ccc.subConnCache, sccLen)
83 }
84 if len(ccc.subConnToAddr) != sctaLen {
85 return fmt.Errorf("ccc = %+v, want len(ccc.subConnToAddr) = %v", ccc.subConnToAddr, sctaLen)
86 }
87 return nil
88 }
89
90
91 func (s) TestLBCacheClientConnExpire(t *testing.T) {
92 mcc := newMockClientConn()
93 if err := checkMockCC(mcc, 0); err != nil {
94 t.Fatal(err)
95 }
96
97 ccc := newLBCacheClientConn(mcc)
98 ccc.timeout = testCacheTimeout
99 if err := checkCacheCC(ccc, 0, 0); err != nil {
100 t.Fatal(err)
101 }
102
103 sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
104
105 if err := checkMockCC(mcc, 1); err != nil {
106 t.Fatal(err)
107 }
108
109 if err := checkCacheCC(ccc, 0, 1); err != nil {
110 t.Fatal(err)
111 }
112
113 sc.Shutdown()
114
115 if err := checkMockCC(mcc, 1); err != nil {
116 t.Fatal(err)
117 }
118
119 if err := checkCacheCC(ccc, 1, 1); err != nil {
120 t.Fatal(err)
121 }
122
123
124 var err error
125 for i := 0; i < 2; i++ {
126 time.Sleep(testCacheTimeout)
127 err = checkMockCC(mcc, 0)
128 if err != nil {
129 continue
130 }
131 err = checkCacheCC(ccc, 0, 0)
132 if err != nil {
133 continue
134 }
135 }
136 if err != nil {
137 t.Fatal(err)
138 }
139 }
140
141
142
143 func (s) TestLBCacheClientConnReuse(t *testing.T) {
144 mcc := newMockClientConn()
145 if err := checkMockCC(mcc, 0); err != nil {
146 t.Fatal(err)
147 }
148
149 ccc := newLBCacheClientConn(mcc)
150 ccc.timeout = testCacheTimeout
151 if err := checkCacheCC(ccc, 0, 0); err != nil {
152 t.Fatal(err)
153 }
154
155 sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
156
157 if err := checkMockCC(mcc, 1); err != nil {
158 t.Fatal(err)
159 }
160
161 if err := checkCacheCC(ccc, 0, 1); err != nil {
162 t.Fatal(err)
163 }
164
165 sc.Shutdown()
166
167 if err := checkMockCC(mcc, 1); err != nil {
168 t.Fatal(err)
169 }
170
171 if err := checkCacheCC(ccc, 1, 1); err != nil {
172 t.Fatal(err)
173 }
174
175
176 sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
177
178 if err := checkMockCC(mcc, 1); err != nil {
179 t.Fatal(err)
180 }
181
182 if err := checkCacheCC(ccc, 0, 1); err != nil {
183 t.Fatal(err)
184 }
185
186 var err error
187
188 time.Sleep(2 * testCacheTimeout)
189 err = checkMockCC(mcc, 1)
190 if err != nil {
191 t.Fatal(err)
192 }
193 err = checkCacheCC(ccc, 0, 1)
194 if err != nil {
195 t.Fatal(err)
196 }
197
198
199 sc.Shutdown()
200
201 if err := checkMockCC(mcc, 1); err != nil {
202 t.Fatal(err)
203 }
204
205 if err := checkCacheCC(ccc, 1, 1); err != nil {
206 t.Fatal(err)
207 }
208
209
210 for i := 0; i < 2; i++ {
211 time.Sleep(testCacheTimeout)
212 err = checkMockCC(mcc, 0)
213 if err != nil {
214 continue
215 }
216 err = checkCacheCC(ccc, 0, 0)
217 if err != nil {
218 continue
219 }
220 }
221 if err != nil {
222 t.Fatal(err)
223 }
224 }
225
226
227
228 func (s) TestLBCache_ShutdownTimer_New_Race(t *testing.T) {
229 mcc := newMockClientConn()
230 if err := checkMockCC(mcc, 0); err != nil {
231 t.Fatal(err)
232 }
233
234 ccc := newLBCacheClientConn(mcc)
235 ccc.timeout = time.Nanosecond
236 if err := checkCacheCC(ccc, 0, 0); err != nil {
237 t.Fatal(err)
238 }
239
240 sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
241
242 if err := checkMockCC(mcc, 1); err != nil {
243 t.Fatal(err)
244 }
245
246 if err := checkCacheCC(ccc, 0, 1); err != nil {
247 t.Fatal(err)
248 }
249
250 done := make(chan struct{})
251
252 go func() {
253 for i := 0; i < 1000; i++ {
254
255
256 sc.Shutdown()
257 sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
258 }
259 close(done)
260 }()
261
262 select {
263 case <-time.After(time.Second):
264 t.Fatalf("Test didn't finish within 1 second. Deadlock")
265 case <-done:
266 }
267 }
268
View as plain text