1
18
19 package weightedroundrobin_test
20
21 import (
22 "context"
23 "encoding/json"
24 "fmt"
25 "sync"
26 "sync/atomic"
27 "testing"
28 "time"
29
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/internal"
32 "google.golang.org/grpc/internal/grpctest"
33 "google.golang.org/grpc/internal/stubserver"
34 "google.golang.org/grpc/internal/testutils/roundrobin"
35 "google.golang.org/grpc/orca"
36 "google.golang.org/grpc/peer"
37 "google.golang.org/grpc/resolver"
38
39 wrr "google.golang.org/grpc/balancer/weightedroundrobin"
40 iwrr "google.golang.org/grpc/balancer/weightedroundrobin/internal"
41
42 testgrpc "google.golang.org/grpc/interop/grpc_testing"
43 testpb "google.golang.org/grpc/interop/grpc_testing"
44 )
45
46 type s struct {
47 grpctest.Tester
48 }
49
50 func Test(t *testing.T) {
51 grpctest.RunSubTests(t, s{})
52 }
53
54 const defaultTestTimeout = 10 * time.Second
55 const weightUpdatePeriod = 50 * time.Millisecond
56 const weightExpirationPeriod = time.Minute
57 const oobReportingInterval = 10 * time.Millisecond
58
59 func init() {
60 iwrr.AllowAnyWeightUpdatePeriod = true
61 }
62
63 func boolp(b bool) *bool { return &b }
64 func float64p(f float64) *float64 { return &f }
65 func stringp(s string) *string { return &s }
66
67 var (
68 perCallConfig = iwrr.LBConfig{
69 EnableOOBLoadReport: boolp(false),
70 OOBReportingPeriod: stringp("0.005s"),
71 BlackoutPeriod: stringp("0s"),
72 WeightExpirationPeriod: stringp("60s"),
73 WeightUpdatePeriod: stringp(".050s"),
74 ErrorUtilizationPenalty: float64p(0),
75 }
76 oobConfig = iwrr.LBConfig{
77 EnableOOBLoadReport: boolp(true),
78 OOBReportingPeriod: stringp("0.005s"),
79 BlackoutPeriod: stringp("0s"),
80 WeightExpirationPeriod: stringp("60s"),
81 WeightUpdatePeriod: stringp(".050s"),
82 ErrorUtilizationPenalty: float64p(0),
83 }
84 )
85
86 type testServer struct {
87 *stubserver.StubServer
88
89 oobMetrics orca.ServerMetricsRecorder
90 callMetrics orca.CallMetricsRecorder
91 }
92
93 type reportType int
94
95 const (
96 reportNone reportType = iota
97 reportOOB
98 reportCall
99 reportBoth
100 )
101
102 func startServer(t *testing.T, r reportType) *testServer {
103 t.Helper()
104
105 smr := orca.NewServerMetricsRecorder()
106 cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder)
107
108 ss := &stubserver.StubServer{
109 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
110 if r := orca.CallMetricsRecorderFromContext(ctx); r != nil {
111
112 sm := cmr.(orca.ServerMetricsProvider).ServerMetrics()
113 r.SetApplicationUtilization(sm.AppUtilization)
114 r.SetQPS(sm.QPS)
115 r.SetEPS(sm.EPS)
116 }
117 return &testpb.Empty{}, nil
118 },
119 }
120
121 var sopts []grpc.ServerOption
122 if r == reportCall || r == reportBoth {
123 sopts = append(sopts, orca.CallMetricsServerOption(nil))
124 }
125
126 if r == reportOOB || r == reportBoth {
127 oso := orca.ServiceOptions{
128 ServerMetricsProvider: smr,
129 MinReportingInterval: 10 * time.Millisecond,
130 }
131 internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&oso)
132 sopts = append(sopts, stubserver.RegisterServiceServerOption(func(s *grpc.Server) {
133 if err := orca.Register(s, oso); err != nil {
134 t.Fatalf("Failed to register orca service: %v", err)
135 }
136 }))
137 }
138
139 if err := ss.StartServer(sopts...); err != nil {
140 t.Fatalf("Error starting server: %v", err)
141 }
142 t.Cleanup(ss.Stop)
143
144 return &testServer{
145 StubServer: ss,
146 oobMetrics: smr,
147 callMetrics: cmr,
148 }
149 }
150
151 func svcConfig(t *testing.T, wrrCfg iwrr.LBConfig) string {
152 t.Helper()
153 m, err := json.Marshal(wrrCfg)
154 if err != nil {
155 t.Fatalf("Error marshaling JSON %v: %v", wrrCfg, err)
156 }
157 sc := fmt.Sprintf(`{"loadBalancingConfig": [ {%q:%v} ] }`, wrr.Name, string(m))
158 t.Logf("Marshaled service config: %v", sc)
159 return sc
160 }
161
162
163
164 func (s) TestBalancer_OneAddress(t *testing.T) {
165 testCases := []struct {
166 rt reportType
167 cfg iwrr.LBConfig
168 }{
169 {rt: reportNone, cfg: perCallConfig},
170 {rt: reportCall, cfg: perCallConfig},
171 {rt: reportOOB, cfg: oobConfig},
172 }
173
174 for _, tc := range testCases {
175 t.Run(fmt.Sprintf("reportType:%v", tc.rt), func(t *testing.T) {
176 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
177 defer cancel()
178
179 srv := startServer(t, tc.rt)
180
181 sc := svcConfig(t, tc.cfg)
182 if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
183 t.Fatalf("Error starting client: %v", err)
184 }
185
186
187 for i := 0; i < 100; i++ {
188 srv.callMetrics.SetQPS(float64(i))
189 srv.oobMetrics.SetQPS(float64(i))
190 if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
191 t.Fatalf("Error from EmptyCall: %v", err)
192 }
193 time.Sleep(time.Millisecond)
194 }
195 })
196 }
197 }
198
199
200
201 func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
202 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
203 defer cancel()
204
205 srv1 := startServer(t, reportNone)
206 srv2 := startServer(t, reportNone)
207
208 sc := svcConfig(t, perCallConfig)
209 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
210 t.Fatalf("Error starting client: %v", err)
211 }
212 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
213 srv1.R.UpdateState(resolver.State{Addresses: addrs})
214
215
216 for i := 0; i < 20; i++ {
217 roundrobin.CheckRoundRobinRPCs(ctx, srv1.Client, addrs)
218 }
219 }
220
221
222
223 func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) {
224 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
225 defer cancel()
226
227 srv1 := startServer(t, reportCall)
228 srv2 := startServer(t, reportCall)
229
230
231
232 srv1.callMetrics.SetQPS(10.0)
233 srv1.callMetrics.SetApplicationUtilization(1.0)
234
235 srv2.callMetrics.SetQPS(10.0)
236 srv2.callMetrics.SetApplicationUtilization(.1)
237
238 sc := svcConfig(t, perCallConfig)
239 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
240 t.Fatalf("Error starting client: %v", err)
241 }
242 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
243 srv1.R.UpdateState(resolver.State{Addresses: addrs})
244
245
246 ensureReached(ctx, t, srv1.Client, 2)
247
248
249 time.Sleep(weightUpdatePeriod)
250 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
251 }
252
253
254
255 func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) {
256 testCases := []struct {
257 name string
258 utilSetter func(orca.ServerMetricsRecorder, float64)
259 }{{
260 name: "application_utilization",
261 utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
262 smr.SetApplicationUtilization(val)
263 },
264 }, {
265 name: "cpu_utilization",
266 utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
267 smr.SetCPUUtilization(val)
268 },
269 }, {
270 name: "application over cpu",
271 utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
272 smr.SetApplicationUtilization(val)
273 smr.SetCPUUtilization(2.0)
274 },
275 }}
276
277 for _, tc := range testCases {
278 t.Run(tc.name, func(t *testing.T) {
279 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
280 defer cancel()
281
282 srv1 := startServer(t, reportOOB)
283 srv2 := startServer(t, reportOOB)
284
285
286
287 srv1.oobMetrics.SetQPS(10.0)
288 tc.utilSetter(srv1.oobMetrics, 1.0)
289
290 srv2.oobMetrics.SetQPS(10.0)
291 tc.utilSetter(srv2.oobMetrics, 0.1)
292
293 sc := svcConfig(t, oobConfig)
294 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
295 t.Fatalf("Error starting client: %v", err)
296 }
297 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
298 srv1.R.UpdateState(resolver.State{Addresses: addrs})
299
300
301 ensureReached(ctx, t, srv1.Client, 2)
302
303
304 time.Sleep(weightUpdatePeriod)
305 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
306 })
307 }
308 }
309
310
311
312
313 func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) {
314 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
315 defer cancel()
316
317 srv1 := startServer(t, reportOOB)
318 srv2 := startServer(t, reportOOB)
319
320
321
322 srv1.oobMetrics.SetQPS(10.0)
323 srv1.oobMetrics.SetApplicationUtilization(1.0)
324
325 srv2.oobMetrics.SetQPS(10.0)
326 srv2.oobMetrics.SetApplicationUtilization(.1)
327
328 sc := svcConfig(t, oobConfig)
329 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
330 t.Fatalf("Error starting client: %v", err)
331 }
332 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
333 srv1.R.UpdateState(resolver.State{Addresses: addrs})
334
335
336 ensureReached(ctx, t, srv1.Client, 2)
337
338
339 time.Sleep(weightUpdatePeriod)
340 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
341
342
343
344 srv1.oobMetrics.SetQPS(10.0)
345 srv1.oobMetrics.SetApplicationUtilization(.1)
346
347 srv2.oobMetrics.SetQPS(10.0)
348 srv2.oobMetrics.SetApplicationUtilization(1.0)
349
350
351 time.Sleep(weightUpdatePeriod + oobReportingInterval)
352 checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
353 }
354
355
356
357
358 func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) {
359 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
360 defer cancel()
361
362 srv1 := startServer(t, reportBoth)
363 srv2 := startServer(t, reportBoth)
364
365
366
367 srv1.oobMetrics.SetQPS(10.0)
368 srv1.oobMetrics.SetApplicationUtilization(1.0)
369
370 srv2.oobMetrics.SetQPS(10.0)
371 srv2.oobMetrics.SetApplicationUtilization(.1)
372
373
374
375
376 srv1.callMetrics.SetQPS(10.0)
377 srv1.callMetrics.SetApplicationUtilization(.1)
378
379 srv2.callMetrics.SetQPS(10.0)
380 srv2.callMetrics.SetApplicationUtilization(1.0)
381
382 sc := svcConfig(t, oobConfig)
383 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
384 t.Fatalf("Error starting client: %v", err)
385 }
386 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
387 srv1.R.UpdateState(resolver.State{Addresses: addrs})
388
389
390 ensureReached(ctx, t, srv1.Client, 2)
391
392
393 time.Sleep(weightUpdatePeriod)
394 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
395
396
397 c := svcConfig(t, perCallConfig)
398 parsedCfg := srv1.R.CC.ParseServiceConfig(c)
399 if parsedCfg.Err != nil {
400 panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
401 }
402 srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
403
404
405 time.Sleep(weightUpdatePeriod)
406 checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
407 }
408
409
410
411 func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) {
412 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
413 defer cancel()
414
415 srv1 := startServer(t, reportOOB)
416 srv2 := startServer(t, reportOOB)
417
418
419
420
421
422
423 srv1.oobMetrics.SetQPS(10.0)
424 srv1.oobMetrics.SetApplicationUtilization(1.0)
425 srv1.oobMetrics.SetEPS(0)
426
427
428
429 srv2.oobMetrics.SetQPS(10.0)
430 srv2.oobMetrics.SetApplicationUtilization(.1)
431 srv2.oobMetrics.SetEPS(10.0)
432
433
434
435 sc := svcConfig(t, oobConfig)
436 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
437 t.Fatalf("Error starting client: %v", err)
438 }
439 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
440 srv1.R.UpdateState(resolver.State{Addresses: addrs})
441
442
443 ensureReached(ctx, t, srv1.Client, 2)
444
445
446 time.Sleep(weightUpdatePeriod)
447 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
448
449
450 newCfg := oobConfig
451 newCfg.ErrorUtilizationPenalty = float64p(0.9)
452 c := svcConfig(t, newCfg)
453 parsedCfg := srv1.R.CC.ParseServiceConfig(c)
454 if parsedCfg.Err != nil {
455 panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
456 }
457 srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
458
459
460 time.Sleep(weightUpdatePeriod + oobReportingInterval)
461 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
462 }
463
464
465
466 func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
467 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
468 defer cancel()
469
470 var mu sync.Mutex
471 start := time.Now()
472 now := start
473 setNow := func(t time.Time) {
474 mu.Lock()
475 defer mu.Unlock()
476 now = t
477 }
478
479 setTimeNow(func() time.Time {
480 mu.Lock()
481 defer mu.Unlock()
482 return now
483 })
484 t.Cleanup(func() { setTimeNow(time.Now) })
485
486 testCases := []struct {
487 blackoutPeriodCfg *string
488 blackoutPeriod time.Duration
489 }{{
490 blackoutPeriodCfg: stringp("1s"),
491 blackoutPeriod: time.Second,
492 }, {
493 blackoutPeriodCfg: nil,
494 blackoutPeriod: 10 * time.Second,
495 }}
496 for _, tc := range testCases {
497 setNow(start)
498 srv1 := startServer(t, reportOOB)
499 srv2 := startServer(t, reportOOB)
500
501
502
503 srv1.oobMetrics.SetQPS(10.0)
504 srv1.oobMetrics.SetApplicationUtilization(1.0)
505
506 srv2.oobMetrics.SetQPS(10.0)
507 srv2.oobMetrics.SetApplicationUtilization(.1)
508
509 cfg := oobConfig
510 cfg.BlackoutPeriod = tc.blackoutPeriodCfg
511 sc := svcConfig(t, cfg)
512 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
513 t.Fatalf("Error starting client: %v", err)
514 }
515 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
516 srv1.R.UpdateState(resolver.State{Addresses: addrs})
517
518
519 ensureReached(ctx, t, srv1.Client, 2)
520
521
522 time.Sleep(weightUpdatePeriod)
523
524 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
525
526
527
528 setNow(start.Add(tc.blackoutPeriod - time.Nanosecond))
529
530 time.Sleep(weightUpdatePeriod)
531 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
532
533
534
535 setNow(start.Add(tc.blackoutPeriod))
536
537 time.Sleep(weightUpdatePeriod)
538 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
539 }
540 }
541
542
543
544
545 func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
546 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
547 defer cancel()
548
549 var mu sync.Mutex
550 start := time.Now()
551 now := start
552 setNow := func(t time.Time) {
553 mu.Lock()
554 defer mu.Unlock()
555 now = t
556 }
557 setTimeNow(func() time.Time {
558 mu.Lock()
559 defer mu.Unlock()
560 return now
561 })
562 t.Cleanup(func() { setTimeNow(time.Now) })
563
564 srv1 := startServer(t, reportBoth)
565 srv2 := startServer(t, reportBoth)
566
567
568
569
570
571 srv1.oobMetrics.SetQPS(10.0)
572 srv1.oobMetrics.SetApplicationUtilization(1.0)
573
574 srv2.oobMetrics.SetQPS(10.0)
575 srv2.oobMetrics.SetApplicationUtilization(.1)
576
577 cfg := oobConfig
578 cfg.OOBReportingPeriod = stringp("60s")
579 sc := svcConfig(t, cfg)
580 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
581 t.Fatalf("Error starting client: %v", err)
582 }
583 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
584 srv1.R.UpdateState(resolver.State{Addresses: addrs})
585
586
587 ensureReached(ctx, t, srv1.Client, 2)
588
589
590 time.Sleep(weightUpdatePeriod)
591 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
592
593
594
595 setNow(start.Add(weightExpirationPeriod - time.Second))
596
597
598 time.Sleep(weightUpdatePeriod)
599 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
600
601
602
603 setNow(start.Add(weightExpirationPeriod + time.Second))
604
605
606 time.Sleep(weightUpdatePeriod)
607 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
608 }
609
610
611 func (s) TestBalancer_AddressesChanging(t *testing.T) {
612 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
613 defer cancel()
614
615 srv1 := startServer(t, reportBoth)
616 srv2 := startServer(t, reportBoth)
617 srv3 := startServer(t, reportBoth)
618 srv4 := startServer(t, reportBoth)
619
620
621 srv1.oobMetrics.SetQPS(10.0)
622 srv1.oobMetrics.SetApplicationUtilization(1.0)
623
624 srv2.oobMetrics.SetQPS(10.0)
625 srv2.oobMetrics.SetApplicationUtilization(.1)
626
627 srv3.oobMetrics.SetQPS(20.0)
628 srv3.oobMetrics.SetApplicationUtilization(1.0)
629
630 srv4.oobMetrics.SetQPS(20.0)
631 srv4.oobMetrics.SetApplicationUtilization(.1)
632
633 sc := svcConfig(t, oobConfig)
634 if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
635 t.Fatalf("Error starting client: %v", err)
636 }
637 srv2.Client = srv1.Client
638 addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}, {Addr: srv3.Address}}
639 srv1.R.UpdateState(resolver.State{Addresses: addrs})
640
641
642 ensureReached(ctx, t, srv1.Client, 3)
643 time.Sleep(weightUpdatePeriod)
644 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2})
645
646
647 addrs = append(addrs, resolver.Address{Addr: srv4.Address})
648 srv1.R.UpdateState(resolver.State{Addresses: addrs})
649 time.Sleep(weightUpdatePeriod)
650 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2}, srvWeight{srv4, 20})
651
652
653 srv3.Stop()
654 time.Sleep(weightUpdatePeriod)
655 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv4, 20})
656
657
658 addrs = []resolver.Address{{Addr: srv1.Address}, {Addr: srv4.Address}}
659 srv1.R.UpdateState(resolver.State{Addresses: addrs})
660 time.Sleep(weightUpdatePeriod)
661 checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv4, 20})
662
663
664 addrs = []resolver.Address{{Addr: srv2.Address}}
665 srv1.R.UpdateState(resolver.State{Addresses: addrs})
666 time.Sleep(weightUpdatePeriod)
667 checkWeights(ctx, t, srvWeight{srv2, 10})
668
669
670 addrs = append(addrs, resolver.Address{Addr: srv4.Address})
671 srv1.R.UpdateState(resolver.State{Addresses: addrs})
672 time.Sleep(weightUpdatePeriod)
673 checkWeights(ctx, t, srvWeight{srv2, 10}, srvWeight{srv4, 20})
674 }
675
676 func ensureReached(ctx context.Context, t *testing.T, c testgrpc.TestServiceClient, n int) {
677 t.Helper()
678 reached := make(map[string]struct{})
679 for len(reached) != n {
680 var peer peer.Peer
681 if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
682 t.Fatalf("Error from EmptyCall: %v", err)
683 }
684 reached[peer.Addr.String()] = struct{}{}
685 }
686 }
687
688 type srvWeight struct {
689 srv *testServer
690 w int
691 }
692
693 const rrIterations = 100
694
695
696
697
698 func checkWeights(ctx context.Context, t *testing.T, sws ...srvWeight) {
699 t.Helper()
700
701 c := sws[0].srv.Client
702
703
704
705 weightSum := 0
706 for _, sw := range sws {
707 weightSum += sw.w
708 }
709 for i := range sws {
710 sws[i].w = rrIterations * sws[i].w / weightSum
711 }
712
713 for attempts := 0; attempts < 10; attempts++ {
714 serverCounts := make(map[string]int)
715 for i := 0; i < rrIterations; i++ {
716 var peer peer.Peer
717 if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
718 t.Fatalf("Error from EmptyCall: %v; timed out waiting for weighted RR behavior?", err)
719 }
720 serverCounts[peer.Addr.String()]++
721 }
722 if len(serverCounts) != len(sws) {
723 continue
724 }
725 success := true
726 for _, sw := range sws {
727 c := serverCounts[sw.srv.Address]
728 if c < sw.w-2 || c > sw.w+2 {
729 success = false
730 break
731 }
732 }
733 if success {
734 t.Logf("Passed iteration %v; counts: %v", attempts, serverCounts)
735 return
736 }
737 t.Logf("Failed iteration %v; counts: %v; want %+v", attempts, serverCounts, sws)
738 time.Sleep(5 * time.Millisecond)
739 }
740 t.Fatalf("Failed to route RPCs with proper ratio")
741 }
742
743 func init() {
744 setTimeNow(time.Now)
745 iwrr.TimeNow = timeNow
746 }
747
748 var timeNowFunc atomic.Value
749
750 func timeNow() time.Time {
751 return timeNowFunc.Load().(func() time.Time)()
752 }
753
754 func setTimeNow(f func() time.Time) {
755 timeNowFunc.Store(f)
756 }
757
View as plain text