1
18
19 package test
20
21 import (
22 "context"
23 "strings"
24 "testing"
25 "time"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/connectivity"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/internal/channelz"
32 imetadata "google.golang.org/grpc/internal/metadata"
33 "google.golang.org/grpc/internal/stubserver"
34 "google.golang.org/grpc/internal/testutils"
35 rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
36 "google.golang.org/grpc/metadata"
37 "google.golang.org/grpc/resolver"
38 "google.golang.org/grpc/resolver/manual"
39 "google.golang.org/grpc/status"
40
41 testgrpc "google.golang.org/grpc/interop/grpc_testing"
42 testpb "google.golang.org/grpc/interop/grpc_testing"
43 )
44
45 const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`
46
47 func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
48 t.Helper()
49
50 r := manual.NewBuilderWithScheme("whatever")
51
52 const backendCount = 5
53 backends := make([]*stubserver.StubServer, backendCount)
54 addrs := make([]resolver.Address, backendCount)
55 for i := 0; i < backendCount; i++ {
56 backend := &stubserver.StubServer{
57 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
58 }
59 if err := backend.StartServer(); err != nil {
60 t.Fatalf("Failed to start backend: %v", err)
61 }
62 t.Logf("Started TestService backend at: %q", backend.Address)
63 t.Cleanup(func() { backend.Stop() })
64
65 backends[i] = backend
66 addrs[i] = resolver.Address{Addr: backend.Address}
67 }
68
69 dopts := []grpc.DialOption{
70 grpc.WithTransportCredentials(insecure.NewCredentials()),
71 grpc.WithResolvers(r),
72 grpc.WithDefaultServiceConfig(rrServiceConfig),
73 }
74 dopts = append(dopts, opts...)
75 cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
76 if err != nil {
77 t.Fatalf("grpc.NewClient() failed: %v", err)
78 }
79 t.Cleanup(func() { cc.Close() })
80 client := testgrpc.NewTestServiceClient(cc)
81
82
83
84 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
85 defer sCancel()
86 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
87 t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
88 }
89
90 r.UpdateState(resolver.State{Addresses: addrs})
91 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
92 t.Fatal(err)
93 }
94 return cc, r, backends
95 }
96
97
98
99
100 func (s) TestRoundRobin_Basic(t *testing.T) {
101 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
102 defer cancel()
103 testRoundRobinBasic(ctx, t)
104 }
105
106
107
108
109
110
111 func (s) TestRoundRobin_AddressesRemoved(t *testing.T) {
112 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
113 defer cancel()
114 cc, r, _ := testRoundRobinBasic(ctx, t)
115
116
117
118 r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
119 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
120
121 const msgWant = "produced zero addresses"
122 client := testgrpc.NewTestServiceClient(cc)
123 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(status.Convert(err).Message(), msgWant) {
124 t.Fatalf("EmptyCall() = %v, want Contains(Message(), %q)", err, msgWant)
125 }
126 }
127
128
129
130
131
132
133 func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) {
134 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
135 defer cancel()
136 cc, r, backends := testRoundRobinBasic(ctx, t)
137
138
139
140 r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
141 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
142
143 client := testgrpc.NewTestServiceClient(cc)
144 doneCh := make(chan struct{})
145 go func() {
146
147
148
149 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
150 t.Errorf("EmptyCall() = %v, want <nil>", err)
151 }
152 close(doneCh)
153 }()
154
155
156
157
158 for {
159 if err := ctx.Err(); err != nil {
160 t.Fatal(err)
161 }
162 tcs, _ := channelz.GetTopChannels(0, 0)
163 if len(tcs) != 1 {
164 t.Fatalf("there should only be one top channel, not %d", len(tcs))
165 }
166 started := tcs[0].ChannelMetrics.CallsStarted.Load()
167 completed := tcs[0].ChannelMetrics.CallsSucceeded.Load() + tcs[0].ChannelMetrics.CallsFailed.Load()
168 if (started - completed) == 1 {
169 break
170 }
171 time.Sleep(defaultTestShortTimeout)
172 }
173
174
175
176 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
177
178 select {
179 case <-ctx.Done():
180 t.Fatal("Timeout when waiting for blocked RPC to complete")
181 case <-doneCh:
182 }
183 }
184
185
186
187
188
189 func (s) TestRoundRobin_OneServerDown(t *testing.T) {
190 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
191 defer cancel()
192 cc, _, backends := testRoundRobinBasic(ctx, t)
193
194
195 backends[len(backends)-1].Stop()
196
197 addrs := make([]resolver.Address, len(backends)-1)
198 for i := 0; i < len(backends)-1; i++ {
199 addrs[i] = resolver.Address{Addr: backends[i].Address}
200 }
201 client := testgrpc.NewTestServiceClient(cc)
202 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
203 t.Fatalf("RPCs are not being round robined across remaining servers: %v", err)
204 }
205 }
206
207
208
209
210
211 func (s) TestRoundRobin_AllServersDown(t *testing.T) {
212 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
213 defer cancel()
214 cc, _, backends := testRoundRobinBasic(ctx, t)
215
216
217 for _, b := range backends {
218 b.Stop()
219 }
220
221 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
222
223
224 client := testgrpc.NewTestServiceClient(cc)
225
226 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
227 t.Fatalf("EmptyCall got err: %v; want Unavailable", err)
228 }
229 }
230
231
232
233
234 func (s) TestRoundRobin_UpdateAddressAttributes(t *testing.T) {
235 const (
236 testMDKey = "test-md"
237 testMDValue = "test-md-value"
238 )
239 r := manual.NewBuilderWithScheme("whatever")
240
241
242
243 testMDChan := make(chan []string, 1)
244 backend := &stubserver.StubServer{
245 EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
246 md, ok := metadata.FromIncomingContext(ctx)
247 if ok {
248 select {
249 case testMDChan <- md[testMDKey]:
250 case <-ctx.Done():
251 return nil, ctx.Err()
252 }
253 }
254 return &testpb.Empty{}, nil
255 },
256 }
257 if err := backend.StartServer(); err != nil {
258 t.Fatalf("Failed to start backend: %v", err)
259 }
260 t.Logf("Started TestService backend at: %q", backend.Address)
261 t.Cleanup(func() { backend.Stop() })
262
263
264 dopts := []grpc.DialOption{
265 grpc.WithTransportCredentials(insecure.NewCredentials()),
266 grpc.WithResolvers(r),
267 grpc.WithDefaultServiceConfig(rrServiceConfig),
268 }
269 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
270 if err != nil {
271 t.Fatalf("grpc.Dial() failed: %v", err)
272 }
273 t.Cleanup(func() { cc.Close() })
274
275
276 addr := resolver.Address{Addr: backend.Address}
277 r.UpdateState(resolver.State{Addresses: []resolver.Address{addr}})
278
279
280 client := testgrpc.NewTestServiceClient(cc)
281 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
282 defer cancel()
283 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
284 t.Fatalf("EmptyCall() = %v, want <nil>", err)
285 }
286 select {
287 case <-ctx.Done():
288 t.Fatalf("Timeout when waiting for metadata received in RPC")
289 case md := <-testMDChan:
290 if len(md) != 0 {
291 t.Fatalf("received metadata %v, want nil", md)
292 }
293 }
294
295
296 addrWithAttributes := imetadata.Set(addr, metadata.Pairs(testMDKey, testMDValue))
297 r.UpdateState(resolver.State{Addresses: []resolver.Address{addrWithAttributes}})
298
299
300
301
302 Done:
303 for {
304 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
305 t.Fatalf("EmptyCall() = %v, want <nil>", err)
306 }
307 select {
308 case <-ctx.Done():
309 t.Fatalf("Timeout when waiting for metadata received in RPC")
310 case md := <-testMDChan:
311 if len(md) == 1 && md[0] == testMDValue {
312 break Done
313 }
314 }
315 time.Sleep(defaultTestShortTimeout)
316 }
317 }
318
View as plain text