1
17
18 package leastrequest
19
20 import (
21 "context"
22 "encoding/json"
23 "fmt"
24 "strings"
25 "sync"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/credentials/insecure"
33 "google.golang.org/grpc/internal"
34 "google.golang.org/grpc/internal/grpctest"
35 "google.golang.org/grpc/internal/stubserver"
36 testgrpc "google.golang.org/grpc/interop/grpc_testing"
37 testpb "google.golang.org/grpc/interop/grpc_testing"
38 "google.golang.org/grpc/peer"
39 "google.golang.org/grpc/resolver"
40 "google.golang.org/grpc/resolver/manual"
41 "google.golang.org/grpc/serviceconfig"
42 )
43
44 const (
45 defaultTestTimeout = 5 * time.Second
46 )
47
48 type s struct {
49 grpctest.Tester
50 }
51
52 func Test(t *testing.T) {
53 grpctest.RunSubTests(t, s{})
54 }
55
56 func (s) TestParseConfig(t *testing.T) {
57 parser := bb{}
58 tests := []struct {
59 name string
60 input string
61 wantCfg serviceconfig.LoadBalancingConfig
62 wantErr string
63 }{
64 {
65 name: "happy-case-default",
66 input: `{}`,
67 wantCfg: &LBConfig{
68 ChoiceCount: 2,
69 },
70 },
71 {
72 name: "happy-case-choice-count-set",
73 input: `{"choiceCount": 3}`,
74 wantCfg: &LBConfig{
75 ChoiceCount: 3,
76 },
77 },
78 {
79 name: "happy-case-choice-count-greater-than-ten",
80 input: `{"choiceCount": 11}`,
81 wantCfg: &LBConfig{
82 ChoiceCount: 10,
83 },
84 },
85 {
86 name: "choice-count-less-than-2",
87 input: `{"choiceCount": 1}`,
88 wantErr: "must be >= 2",
89 },
90 {
91 name: "invalid-json",
92 input: "{{invalidjson{{",
93 wantErr: "invalid character",
94 },
95 }
96 for _, test := range tests {
97 t.Run(test.name, func(t *testing.T) {
98 gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
99
100
101
102
103
104 if (gotErr != nil) != (test.wantErr != "") {
105 t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
106 }
107 if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
108 t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
109 }
110 if test.wantErr != "" {
111 return
112 }
113 if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" {
114 t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff)
115 }
116 })
117 }
118 }
119
120
121
122
123 func setupBackends(t *testing.T) []string {
124 t.Helper()
125 const numBackends = 3
126 addresses := make([]string, numBackends)
127
128 for i := 0; i < numBackends; i++ {
129 backend := &stubserver.StubServer{
130 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
131 return &testpb.Empty{}, nil
132 },
133 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
134 <-stream.Context().Done()
135 return nil
136 },
137 }
138 if err := backend.StartServer(); err != nil {
139 t.Fatalf("Failed to start backend: %v", err)
140 }
141 t.Logf("Started good TestService backend at: %q", backend.Address)
142 t.Cleanup(func() { backend.Stop() })
143 addresses[i] = backend.Address
144 }
145 return addresses
146 }
147
148
149
150
151
152
153
154 func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
155 wantAddrCount := make(map[string]int)
156 for _, addr := range addrs {
157 wantAddrCount[addr.Addr]++
158 }
159 gotAddrCount := make(map[string]int)
160 for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
161 gotAddrCount = make(map[string]int)
162
163 var iterations [][]string
164 for i := 0; i < 3; i++ {
165 iteration := make([]string, len(addrs))
166 for c := 0; c < len(addrs); c++ {
167 var peer peer.Peer
168 client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer))
169 iteration[c] = peer.Addr.String()
170 }
171 iterations = append(iterations, iteration)
172 }
173
174 for _, addr := range iterations[0] {
175 gotAddrCount[addr]++
176 }
177 if !cmp.Equal(gotAddrCount, wantAddrCount) {
178 continue
179 }
180
181 if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
182 continue
183 }
184 return nil
185 }
186 return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v; got: %v", addrs, gotAddrCount)
187 }
188
189
190
191
192
193
194
195 func (s) TestLeastRequestE2E(t *testing.T) {
196 defer func(u func() uint32) {
197 grpcranduint32 = u
198 }(grpcranduint32)
199 var index int
200 indexes := []uint32{
201 0, 0, 1, 1, 2, 2,
202 }
203 grpcranduint32 = func() uint32 {
204 ret := indexes[index%len(indexes)]
205 index++
206 return ret
207 }
208 addresses := setupBackends(t)
209
210 mr := manual.NewBuilderWithScheme("lr-e2e")
211 defer mr.Close()
212
213
214 lrscJSON := `
215 {
216 "loadBalancingConfig": [
217 {
218 "least_request_experimental": {
219 "choiceCount": 2
220 }
221 }
222 ]
223 }`
224 sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
225 firstThreeAddresses := []resolver.Address{
226 {Addr: addresses[0]},
227 {Addr: addresses[1]},
228 {Addr: addresses[2]},
229 }
230 mr.InitialState(resolver.State{
231 Addresses: firstThreeAddresses,
232 ServiceConfig: sc,
233 })
234
235 cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
236 if err != nil {
237 t.Fatalf("grpc.NewClient() failed: %v", err)
238 }
239 defer cc.Close()
240 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
241 defer cancel()
242 testServiceClient := testgrpc.NewTestServiceClient(cc)
243
244
245
246
247
248 if err := checkRoundRobinRPCs(ctx, testServiceClient, firstThreeAddresses); err != nil {
249 t.Fatalf("error in expected round robin: %v", err)
250 }
251
252
253
254
255 index = 0
256 peerAtIndex := make([]string, 3)
257 var peer0 peer.Peer
258 if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
259 t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
260 }
261 peerAtIndex[0] = peer0.Addr.String()
262 if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
263 t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
264 }
265 peerAtIndex[1] = peer0.Addr.String()
266 if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
267 t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
268 }
269 peerAtIndex[2] = peer0.Addr.String()
270
271
272
273
274 index = 0
275 indexes = []uint32{
276 0, 0,
277 0, 1,
278 1, 2,
279 0, 3,
280 1, 0,
281 2, 0,
282 0, 0,
283 2, 2,
284 2, 1,
285 }
286 wantIndex := []uint32{0, 1, 2, 0, 1, 2, 0, 2, 1}
287
288
289
290
291 for _, wantIndex := range wantIndex {
292 stream, err := testServiceClient.FullDuplexCall(ctx)
293 if err != nil {
294 t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
295 }
296 p, ok := peer.FromContext(stream.Context())
297 if !ok {
298 t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
299 }
300 if p.Addr.String() != peerAtIndex[wantIndex] {
301 t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), peerAtIndex[wantIndex])
302 }
303 }
304 }
305
306
307
308
309
310
311 func (s) TestLeastRequestPersistsCounts(t *testing.T) {
312 defer func(u func() uint32) {
313 grpcranduint32 = u
314 }(grpcranduint32)
315 var index int
316 indexes := []uint32{
317 0, 0, 1, 1,
318 }
319 grpcranduint32 = func() uint32 {
320 ret := indexes[index%len(indexes)]
321 index++
322 return ret
323 }
324 addresses := setupBackends(t)
325
326 mr := manual.NewBuilderWithScheme("lr-e2e")
327 defer mr.Close()
328
329
330 lrscJSON := `
331 {
332 "loadBalancingConfig": [
333 {
334 "least_request_experimental": {
335 "choiceCount": 2
336 }
337 }
338 ]
339 }`
340 sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
341 firstTwoAddresses := []resolver.Address{
342 {Addr: addresses[0]},
343 {Addr: addresses[1]},
344 }
345 mr.InitialState(resolver.State{
346 Addresses: firstTwoAddresses,
347 ServiceConfig: sc,
348 })
349
350 cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
351 if err != nil {
352 t.Fatalf("grpc.NewClient() failed: %v", err)
353 }
354 defer cc.Close()
355 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
356 defer cancel()
357 testServiceClient := testgrpc.NewTestServiceClient(cc)
358
359
360
361
362
363 if err := checkRoundRobinRPCs(ctx, testServiceClient, firstTwoAddresses); err != nil {
364 t.Fatalf("error in expected round robin: %v", err)
365 }
366
367
368
369
370 for i := 0; i < 50; i++ {
371 _, err := testServiceClient.FullDuplexCall(ctx)
372 if err != nil {
373 t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
374 }
375 }
376
377
378
379
380
381
382
383 index = 0
384 indexes = []uint32{
385 0, 1, 2, 3, 4, 5,
386 }
387 lrscJSON = `
388 {
389 "loadBalancingConfig": [
390 {
391 "least_request_experimental": {
392 "choiceCount": 3
393 }
394 }
395 ]
396 }`
397 sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
398 fullAddresses := []resolver.Address{
399 {Addr: addresses[0]},
400 {Addr: addresses[1]},
401 {Addr: addresses[2]},
402 }
403 mr.UpdateState(resolver.State{
404 Addresses: fullAddresses,
405 ServiceConfig: sc,
406 })
407 newAddress := fullAddresses[2]
408
409
410
411 if err := checkRoundRobinRPCs(ctx, testServiceClient, []resolver.Address{newAddress}); err != nil {
412 t.Fatalf("error in expected round robin: %v", err)
413 }
414
415
416
417
418 for i := 0; i < 25; i++ {
419 stream, err := testServiceClient.FullDuplexCall(ctx)
420 if err != nil {
421 t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
422 }
423 p, ok := peer.FromContext(stream.Context())
424 if !ok {
425 t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
426 }
427 if p.Addr.String() != addresses[2] {
428 t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), addresses[2])
429 }
430 }
431
432
433
434
435
436 wantAddrCount := map[string]int{
437 addresses[0]: 1,
438 addresses[1]: 1,
439 addresses[2]: 1,
440 }
441 gotAddrCount := make(map[string]int)
442 for i := 0; i < len(addresses); i++ {
443 stream, err := testServiceClient.FullDuplexCall(ctx)
444 if err != nil {
445 t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
446 }
447 p, ok := peer.FromContext(stream.Context())
448 if !ok {
449 t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
450 }
451 if p.Addr != nil {
452 gotAddrCount[p.Addr.String()]++
453 }
454 }
455 if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
456 t.Fatalf("addr count (-got:, +want): %v", diff)
457 }
458 }
459
460
461
462
463
464 func (s) TestConcurrentRPCs(t *testing.T) {
465 addresses := setupBackends(t)
466
467 mr := manual.NewBuilderWithScheme("lr-e2e")
468 defer mr.Close()
469
470
471 lrscJSON := `
472 {
473 "loadBalancingConfig": [
474 {
475 "least_request_experimental": {
476 "choiceCount": 2
477 }
478 }
479 ]
480 }`
481 sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
482 firstTwoAddresses := []resolver.Address{
483 {Addr: addresses[0]},
484 {Addr: addresses[1]},
485 }
486 mr.InitialState(resolver.State{
487 Addresses: firstTwoAddresses,
488 ServiceConfig: sc,
489 })
490
491 cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
492 if err != nil {
493 t.Fatalf("grpc.NewClient() failed: %v", err)
494 }
495 defer cc.Close()
496 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
497 defer cancel()
498 testServiceClient := testgrpc.NewTestServiceClient(cc)
499
500 var wg sync.WaitGroup
501 for i := 0; i < 100; i++ {
502 wg.Add(1)
503 go func() {
504 defer wg.Done()
505 for j := 0; j < 5; j++ {
506 testServiceClient.EmptyCall(ctx, &testpb.Empty{})
507 }
508 }()
509 }
510 wg.Wait()
511
512 }
513
View as plain text