1
18
19 package outlierdetection
20
21 import (
22 "context"
23 "encoding/json"
24 "errors"
25 "fmt"
26 "math"
27 "strings"
28 "sync"
29 "testing"
30 "time"
31
32 "github.com/google/go-cmp/cmp"
33 "github.com/google/go-cmp/cmp/cmpopts"
34 "google.golang.org/grpc/balancer"
35 "google.golang.org/grpc/connectivity"
36 "google.golang.org/grpc/internal/balancer/stub"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcsync"
39 "google.golang.org/grpc/internal/grpctest"
40 iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
41 "google.golang.org/grpc/internal/testutils"
42 "google.golang.org/grpc/resolver"
43 "google.golang.org/grpc/serviceconfig"
44 "google.golang.org/grpc/xds/internal/balancer/clusterimpl"
45 )
46
47 var (
48 defaultTestTimeout = 5 * time.Second
49 defaultTestShortTimeout = 10 * time.Millisecond
50 )
51
52 type s struct {
53 grpctest.Tester
54 }
55
56 func Test(t *testing.T) {
57 grpctest.RunSubTests(t, s{})
58 }
59
60
61
62 func (s) TestParseConfig(t *testing.T) {
63 const errParseConfigName = "errParseConfigBalancer"
64 stub.Register(errParseConfigName, stub.BalancerFuncs{
65 ParseConfig: func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
66 return nil, errors.New("some error")
67 },
68 })
69
70 parser := bb{}
71 const (
72 defaultInterval = iserviceconfig.Duration(10 * time.Second)
73 defaultBaseEjectionTime = iserviceconfig.Duration(30 * time.Second)
74 defaultMaxEjectionTime = iserviceconfig.Duration(300 * time.Second)
75 defaultMaxEjectionPercent = 10
76 defaultSuccessRateStdevFactor = 1900
77 defaultEnforcingSuccessRate = 100
78 defaultSuccessRateMinimumHosts = 5
79 defaultSuccessRateRequestVolume = 100
80 defaultFailurePercentageThreshold = 85
81 defaultEnforcingFailurePercentage = 0
82 defaultFailurePercentageMinimumHosts = 5
83 defaultFailurePercentageRequestVolume = 50
84 )
85 tests := []struct {
86 name string
87 input string
88 wantCfg serviceconfig.LoadBalancingConfig
89 wantErr string
90 }{
91 {
92 name: "no-fields-set-should-get-default",
93 input: `{
94 "childPolicy": [
95 {
96 "xds_cluster_impl_experimental": {
97 "cluster": "test_cluster"
98 }
99 }
100 ]
101 }`,
102 wantCfg: &LBConfig{
103 Interval: defaultInterval,
104 BaseEjectionTime: defaultBaseEjectionTime,
105 MaxEjectionTime: defaultMaxEjectionTime,
106 MaxEjectionPercent: defaultMaxEjectionPercent,
107 ChildPolicy: &iserviceconfig.BalancerConfig{
108 Name: "xds_cluster_impl_experimental",
109 Config: &clusterimpl.LBConfig{
110 Cluster: "test_cluster",
111 },
112 },
113 },
114 },
115
116 {
117 name: "some-top-level-fields-set",
118 input: `{
119 "interval": "15s",
120 "maxEjectionTime": "350s",
121 "childPolicy": [
122 {
123 "xds_cluster_impl_experimental": {
124 "cluster": "test_cluster"
125 }
126 }
127 ]
128 }`,
129
130 wantCfg: &LBConfig{
131 Interval: iserviceconfig.Duration(15 * time.Second),
132 BaseEjectionTime: defaultBaseEjectionTime,
133 MaxEjectionTime: iserviceconfig.Duration(350 * time.Second),
134 MaxEjectionPercent: defaultMaxEjectionPercent,
135 ChildPolicy: &iserviceconfig.BalancerConfig{
136 Name: "xds_cluster_impl_experimental",
137 Config: &clusterimpl.LBConfig{
138 Cluster: "test_cluster",
139 },
140 },
141 },
142 },
143 {
144 name: "success-rate-ejection-present-but-no-fields",
145 input: `{
146 "successRateEjection": {},
147 "childPolicy": [
148 {
149 "xds_cluster_impl_experimental": {
150 "cluster": "test_cluster"
151 }
152 }
153 ]
154 }`,
155
156 wantCfg: &LBConfig{
157 Interval: defaultInterval,
158 BaseEjectionTime: defaultBaseEjectionTime,
159 MaxEjectionTime: defaultMaxEjectionTime,
160 MaxEjectionPercent: defaultMaxEjectionPercent,
161 SuccessRateEjection: &SuccessRateEjection{
162 StdevFactor: defaultSuccessRateStdevFactor,
163 EnforcementPercentage: defaultEnforcingSuccessRate,
164 MinimumHosts: defaultSuccessRateMinimumHosts,
165 RequestVolume: defaultSuccessRateRequestVolume,
166 },
167 ChildPolicy: &iserviceconfig.BalancerConfig{
168 Name: "xds_cluster_impl_experimental",
169 Config: &clusterimpl.LBConfig{
170 Cluster: "test_cluster",
171 },
172 },
173 },
174 },
175 {
176 name: "success-rate-ejection-present-partially-set",
177 input: `{
178 "successRateEjection": {
179 "stdevFactor": 1000,
180 "minimumHosts": 5
181 },
182 "childPolicy": [
183 {
184 "xds_cluster_impl_experimental": {
185 "cluster": "test_cluster"
186 }
187 }
188 ]
189 }`,
190
191
192 wantCfg: &LBConfig{
193 Interval: defaultInterval,
194 BaseEjectionTime: defaultBaseEjectionTime,
195 MaxEjectionTime: defaultMaxEjectionTime,
196 MaxEjectionPercent: defaultMaxEjectionPercent,
197 SuccessRateEjection: &SuccessRateEjection{
198 StdevFactor: 1000,
199 EnforcementPercentage: defaultEnforcingSuccessRate,
200 MinimumHosts: 5,
201 RequestVolume: defaultSuccessRateRequestVolume,
202 },
203 ChildPolicy: &iserviceconfig.BalancerConfig{
204 Name: "xds_cluster_impl_experimental",
205 Config: &clusterimpl.LBConfig{
206 Cluster: "test_cluster",
207 },
208 },
209 },
210 },
211 {
212 name: "success-rate-ejection-present-fully-set",
213 input: `{
214 "successRateEjection": {
215 "stdevFactor": 1000,
216 "enforcementPercentage": 50,
217 "minimumHosts": 5,
218 "requestVolume": 50
219 },
220 "childPolicy": [
221 {
222 "xds_cluster_impl_experimental": {
223 "cluster": "test_cluster"
224 }
225 }
226 ]
227 }`,
228 wantCfg: &LBConfig{
229 Interval: defaultInterval,
230 BaseEjectionTime: defaultBaseEjectionTime,
231 MaxEjectionTime: defaultMaxEjectionTime,
232 MaxEjectionPercent: defaultMaxEjectionPercent,
233 SuccessRateEjection: &SuccessRateEjection{
234 StdevFactor: 1000,
235 EnforcementPercentage: 50,
236 MinimumHosts: 5,
237 RequestVolume: 50,
238 },
239 ChildPolicy: &iserviceconfig.BalancerConfig{
240 Name: "xds_cluster_impl_experimental",
241 Config: &clusterimpl.LBConfig{
242 Cluster: "test_cluster",
243 },
244 },
245 },
246 },
247 {
248 name: "failure-percentage-ejection-present-but-no-fields",
249 input: `{
250 "failurePercentageEjection": {},
251 "childPolicy": [
252 {
253 "xds_cluster_impl_experimental": {
254 "cluster": "test_cluster"
255 }
256 }
257 ]
258 }`,
259
260 wantCfg: &LBConfig{
261 Interval: defaultInterval,
262 BaseEjectionTime: defaultBaseEjectionTime,
263 MaxEjectionTime: defaultMaxEjectionTime,
264 MaxEjectionPercent: defaultMaxEjectionPercent,
265 FailurePercentageEjection: &FailurePercentageEjection{
266 Threshold: defaultFailurePercentageThreshold,
267 EnforcementPercentage: defaultEnforcingFailurePercentage,
268 MinimumHosts: defaultFailurePercentageMinimumHosts,
269 RequestVolume: defaultFailurePercentageRequestVolume,
270 },
271 ChildPolicy: &iserviceconfig.BalancerConfig{
272 Name: "xds_cluster_impl_experimental",
273 Config: &clusterimpl.LBConfig{
274 Cluster: "test_cluster",
275 },
276 },
277 },
278 },
279 {
280 name: "failure-percentage-ejection-present-partially-set",
281 input: `{
282 "failurePercentageEjection": {
283 "threshold": 80,
284 "minimumHosts": 10
285 },
286 "childPolicy": [
287 {
288 "xds_cluster_impl_experimental": {
289 "cluster": "test_cluster"
290 }
291 }
292 ]
293 }`,
294
295
296 wantCfg: &LBConfig{
297 Interval: defaultInterval,
298 BaseEjectionTime: defaultBaseEjectionTime,
299 MaxEjectionTime: defaultMaxEjectionTime,
300 MaxEjectionPercent: defaultMaxEjectionPercent,
301 FailurePercentageEjection: &FailurePercentageEjection{
302 Threshold: 80,
303 EnforcementPercentage: defaultEnforcingFailurePercentage,
304 MinimumHosts: 10,
305 RequestVolume: defaultFailurePercentageRequestVolume,
306 },
307 ChildPolicy: &iserviceconfig.BalancerConfig{
308 Name: "xds_cluster_impl_experimental",
309 Config: &clusterimpl.LBConfig{
310 Cluster: "test_cluster",
311 },
312 },
313 },
314 },
315 {
316 name: "failure-percentage-ejection-present-fully-set",
317 input: `{
318 "failurePercentageEjection": {
319 "threshold": 80,
320 "enforcementPercentage": 100,
321 "minimumHosts": 10,
322 "requestVolume": 40
323 },
324 "childPolicy": [
325 {
326 "xds_cluster_impl_experimental": {
327 "cluster": "test_cluster"
328 }
329 }
330 ]
331 }`,
332 wantCfg: &LBConfig{
333 Interval: defaultInterval,
334 BaseEjectionTime: defaultBaseEjectionTime,
335 MaxEjectionTime: defaultMaxEjectionTime,
336 MaxEjectionPercent: defaultMaxEjectionPercent,
337 FailurePercentageEjection: &FailurePercentageEjection{
338 Threshold: 80,
339 EnforcementPercentage: 100,
340 MinimumHosts: 10,
341 RequestVolume: 40,
342 },
343 ChildPolicy: &iserviceconfig.BalancerConfig{
344 Name: "xds_cluster_impl_experimental",
345 Config: &clusterimpl.LBConfig{
346 Cluster: "test_cluster",
347 },
348 },
349 },
350 },
351 {
352 name: "lb-config-every-field-set-zero-value",
353 input: `{
354 "interval": "0s",
355 "baseEjectionTime": "0s",
356 "maxEjectionTime": "0s",
357 "maxEjectionPercent": 0,
358 "successRateEjection": {
359 "stdevFactor": 0,
360 "enforcementPercentage": 0,
361 "minimumHosts": 0,
362 "requestVolume": 0
363 },
364 "failurePercentageEjection": {
365 "threshold": 0,
366 "enforcementPercentage": 0,
367 "minimumHosts": 0,
368 "requestVolume": 0
369 },
370 "childPolicy": [
371 {
372 "xds_cluster_impl_experimental": {
373 "cluster": "test_cluster"
374 }
375 }
376 ]
377 }`,
378 wantCfg: &LBConfig{
379 SuccessRateEjection: &SuccessRateEjection{},
380 FailurePercentageEjection: &FailurePercentageEjection{},
381 ChildPolicy: &iserviceconfig.BalancerConfig{
382 Name: "xds_cluster_impl_experimental",
383 Config: &clusterimpl.LBConfig{
384 Cluster: "test_cluster",
385 },
386 },
387 },
388 },
389 {
390 name: "lb-config-every-field-set",
391 input: `{
392 "interval": "10s",
393 "baseEjectionTime": "30s",
394 "maxEjectionTime": "300s",
395 "maxEjectionPercent": 10,
396 "successRateEjection": {
397 "stdevFactor": 1900,
398 "enforcementPercentage": 100,
399 "minimumHosts": 5,
400 "requestVolume": 100
401 },
402 "failurePercentageEjection": {
403 "threshold": 85,
404 "enforcementPercentage": 5,
405 "minimumHosts": 5,
406 "requestVolume": 50
407 },
408 "childPolicy": [
409 {
410 "xds_cluster_impl_experimental": {
411 "cluster": "test_cluster"
412 }
413 }
414 ]
415 }`,
416 wantCfg: &LBConfig{
417 Interval: iserviceconfig.Duration(10 * time.Second),
418 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
419 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
420 MaxEjectionPercent: 10,
421 SuccessRateEjection: &SuccessRateEjection{
422 StdevFactor: 1900,
423 EnforcementPercentage: 100,
424 MinimumHosts: 5,
425 RequestVolume: 100,
426 },
427 FailurePercentageEjection: &FailurePercentageEjection{
428 Threshold: 85,
429 EnforcementPercentage: 5,
430 MinimumHosts: 5,
431 RequestVolume: 50,
432 },
433 ChildPolicy: &iserviceconfig.BalancerConfig{
434 Name: "xds_cluster_impl_experimental",
435 Config: &clusterimpl.LBConfig{
436 Cluster: "test_cluster",
437 },
438 },
439 },
440 },
441 {
442 name: "interval-is-negative",
443 input: `{"interval": "-10s"}`,
444 wantErr: "OutlierDetectionLoadBalancingConfig.interval = -10s; must be >= 0",
445 },
446 {
447 name: "base-ejection-time-is-negative",
448 input: `{"baseEjectionTime": "-10s"}`,
449 wantErr: "OutlierDetectionLoadBalancingConfig.base_ejection_time = -10s; must be >= 0",
450 },
451 {
452 name: "max-ejection-time-is-negative",
453 input: `{"maxEjectionTime": "-10s"}`,
454 wantErr: "OutlierDetectionLoadBalancingConfig.max_ejection_time = -10s; must be >= 0",
455 },
456 {
457 name: "max-ejection-percent-is-greater-than-100",
458 input: `{"maxEjectionPercent": 150}`,
459 wantErr: "OutlierDetectionLoadBalancingConfig.max_ejection_percent = 150; must be <= 100",
460 },
461 {
462 name: "enforcement-percentage-success-rate-is-greater-than-100",
463 input: `{
464 "successRateEjection": {
465 "enforcementPercentage": 150
466 }
467 }`,
468 wantErr: "OutlierDetectionLoadBalancingConfig.SuccessRateEjection.enforcement_percentage = 150; must be <= 100",
469 },
470 {
471 name: "failure-percentage-threshold-is-greater-than-100",
472 input: `{
473 "failurePercentageEjection": {
474 "threshold": 150
475 }
476 }`,
477 wantErr: "OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.threshold = 150; must be <= 100",
478 },
479 {
480 name: "enforcement-percentage-failure-percentage-ejection-is-greater-than-100",
481 input: `{
482 "failurePercentageEjection": {
483 "enforcementPercentage": 150
484 }
485 }`,
486 wantErr: "OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.enforcement_percentage = 150; must be <= 100",
487 },
488 {
489 name: "child-policy-present-but-parse-error",
490 input: `{
491 "childPolicy": [
492 {
493 "errParseConfigBalancer": {
494 "cluster": "test_cluster"
495 }
496 }
497 ]
498 }`,
499 wantErr: "error parsing loadBalancingConfig for policy \"errParseConfigBalancer\"",
500 },
501 {
502 name: "no-supported-child-policy",
503 input: `{
504 "childPolicy": [
505 {
506 "doesNotExistBalancer": {
507 "cluster": "test_cluster"
508 }
509 }
510 ]
511 }`,
512 wantErr: "invalid loadBalancingConfig: no supported policies found",
513 },
514 }
515 for _, test := range tests {
516 t.Run(test.name, func(t *testing.T) {
517 gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
518 if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
519 t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
520 }
521 if (gotErr != nil) != (test.wantErr != "") {
522 t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
523 }
524 if test.wantErr != "" {
525 return
526 }
527 if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" {
528 t.Fatalf("parseConfig(%v) got unexpected output, diff (-got +want): %v", string(test.input), diff)
529 }
530 })
531 }
532 }
533
534 func (lbc *LBConfig) Equal(lbc2 *LBConfig) bool {
535 if !lbc.EqualIgnoringChildPolicy(lbc2) {
536 return false
537 }
538 return cmp.Equal(lbc.ChildPolicy, lbc2.ChildPolicy)
539 }
540
541 type subConnWithState struct {
542 sc balancer.SubConn
543 state balancer.SubConnState
544 }
545
546 func setup(t *testing.T) (*outlierDetectionBalancer, *testutils.BalancerClientConn, func()) {
547 t.Helper()
548 builder := balancer.Get(Name)
549 if builder == nil {
550 t.Fatalf("balancer.Get(%q) returned nil", Name)
551 }
552 tcc := testutils.NewBalancerClientConn(t)
553 ch := channelz.RegisterChannel(nil, "test channel")
554 t.Cleanup(func() { channelz.RemoveEntry(ch.ID) })
555 odB := builder.Build(tcc, balancer.BuildOptions{ChannelzParent: ch})
556 return odB.(*outlierDetectionBalancer), tcc, odB.Close
557 }
558
559 type emptyChildConfig struct {
560 serviceconfig.LoadBalancingConfig
561 }
562
563
564
565
566
567
568
569
570
571
572
573
574 func (s) TestChildBasicOperations(t *testing.T) {
575 bc := emptyChildConfig{}
576
577 ccsCh := testutils.NewChannel()
578 closeCh := testutils.NewChannel()
579
580 stub.Register(t.Name()+"child1", stub.BalancerFuncs{
581 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
582 ccsCh.Send(ccs.BalancerConfig)
583 return nil
584 },
585 Close: func(bd *stub.BalancerData) {
586 closeCh.Send(nil)
587 },
588 })
589
590 stub.Register(t.Name()+"child2", stub.BalancerFuncs{
591 UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
592
593
594 bd.ClientConn.UpdateState(balancer.State{
595 ConnectivityState: connectivity.Ready,
596 Picker: &testutils.TestConstPicker{},
597 })
598 ccsCh.Send(nil)
599 return nil
600 },
601 Close: func(bd *stub.BalancerData) {
602 closeCh.Send(nil)
603 },
604 })
605
606 od, tcc, _ := setup(t)
607
608
609
610 od.UpdateClientConnState(balancer.ClientConnState{
611 BalancerConfig: &LBConfig{
612 ChildPolicy: &iserviceconfig.BalancerConfig{
613 Name: t.Name() + "child1",
614 Config: bc,
615 },
616 },
617 })
618
619 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
620 defer cancel()
621 cr, err := ccsCh.Receive(ctx)
622 if err != nil {
623 t.Fatalf("timed out waiting for UpdateClientConnState on the first child balancer: %v", err)
624 }
625 if _, ok := cr.(emptyChildConfig); !ok {
626 t.Fatalf("Received child policy config of type %T, want %T", cr, emptyChildConfig{})
627 }
628
629
630
631
632 od.UpdateClientConnState(balancer.ClientConnState{
633 BalancerConfig: &LBConfig{
634 Interval: math.MaxInt64,
635 ChildPolicy: &iserviceconfig.BalancerConfig{
636 Name: t.Name() + "child2",
637 Config: emptyChildConfig{},
638 },
639 },
640 })
641
642
643
644 select {
645 case <-ctx.Done():
646 t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
647 case state := <-tcc.NewStateCh:
648 if state != connectivity.Ready {
649 t.Fatalf("ClientConn received connectivity state %v, want %v", state, connectivity.Ready)
650 }
651 }
652
653
654 if _, err = closeCh.Receive(ctx); err != nil {
655 t.Fatalf("timed out waiting for the first child balancer to be closed: %v", err)
656 }
657
658 if _, err = ccsCh.Receive(ctx); err != nil {
659 t.Fatalf("timed out waiting for UpdateClientConnState on the second child balancer: %v", err)
660 }
661
662
663 od.Close()
664 if _, err = closeCh.Receive(ctx); err != nil {
665 t.Fatalf("timed out waiting for the second child balancer to be closed: %v", err)
666 }
667 }
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684 func (s) TestUpdateAddresses(t *testing.T) {
685 scsCh := testutils.NewChannel()
686 var scw1, scw2 balancer.SubConn
687 var err error
688 stub.Register(t.Name(), stub.BalancerFuncs{
689 UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
690 scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{
691 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw1, state: state}) },
692 })
693 if err != nil {
694 t.Errorf("error in od.NewSubConn call: %v", err)
695 }
696 scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{
697 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw2, state: state}) },
698 })
699 if err != nil {
700 t.Errorf("error in od.NewSubConn call: %v", err)
701 }
702 bd.ClientConn.UpdateState(balancer.State{
703 ConnectivityState: connectivity.Ready,
704 Picker: &rrPicker{
705 scs: []balancer.SubConn{scw1, scw2},
706 },
707 })
708 return nil
709 },
710 })
711
712 od, tcc, cleanup := setup(t)
713 defer cleanup()
714
715 od.UpdateClientConnState(balancer.ClientConnState{
716 ResolverState: resolver.State{
717 Addresses: []resolver.Address{
718 {Addr: "address1"},
719 {Addr: "address2"},
720 },
721 },
722 BalancerConfig: &LBConfig{
723 Interval: iserviceconfig.Duration(10 * time.Second),
724 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
725 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
726 MaxEjectionPercent: 10,
727 FailurePercentageEjection: &FailurePercentageEjection{
728 Threshold: 50,
729 EnforcementPercentage: 100,
730 MinimumHosts: 2,
731 RequestVolume: 3,
732 },
733 ChildPolicy: &iserviceconfig.BalancerConfig{
734 Name: t.Name(),
735 Config: emptyChildConfig{},
736 },
737 },
738 })
739
740 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
741 defer cancel()
742
743
744
745 select {
746 case <-ctx.Done():
747 t.Fatal("timeout while waiting for a UpdateState call on the ClientConn")
748 case picker := <-tcc.NewPickerCh:
749 pi, err := picker.Pick(balancer.PickInfo{})
750 if err != nil {
751 t.Fatalf("picker.Pick failed with error: %v", err)
752 }
753
754
755 for c := 0; c < 5; c++ {
756 pi.Done(balancer.DoneInfo{})
757 }
758 pi, err = picker.Pick(balancer.PickInfo{})
759 if err != nil {
760 t.Fatalf("picker.Pick failed with error: %v", err)
761 }
762
763
764
765
766 for c := 0; c < 5; c++ {
767 pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
768 }
769 od.intervalTimerAlgorithm()
770
771
772 gotSCWS, err := scsCh.Receive(ctx)
773 if err != nil {
774 t.Fatalf("Error waiting for Sub Conn update: %v", err)
775 }
776 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
777 sc: scw2,
778 state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
779 }); err != nil {
780 t.Fatalf("Error in Sub Conn update: %v", err)
781 }
782 }
783
784
785
786 od.UpdateAddresses(scw1, []resolver.Address{{Addr: "address2"}})
787
788
789 select {
790 case <-ctx.Done():
791 t.Fatal("timeout while waiting for a UpdateState call on the ClientConn")
792 case <-tcc.UpdateAddressesAddrsCh:
793 }
794
795 gotSCWS, err := scsCh.Receive(ctx)
796 if err != nil {
797 t.Fatalf("Error waiting for Sub Conn update: %v", err)
798 }
799 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
800 sc: scw1,
801 state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
802 }); err != nil {
803 t.Fatalf("Error in Sub Conn update: %v", err)
804 }
805
806
807
808 od.UpdateAddresses(scw1, []resolver.Address{
809 {Addr: "address1"},
810 {Addr: "address2"},
811 })
812
813 gotSCWS, err = scsCh.Receive(ctx)
814 if err != nil {
815 t.Fatalf("Error waiting for Sub Conn update: %v", err)
816 }
817 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
818 sc: scw1,
819 state: balancer.SubConnState{ConnectivityState: connectivity.Idle},
820 }); err != nil {
821 t.Fatalf("Error in Sub Conn update: %v", err)
822 }
823
824
825
826
827 od.UpdateAddresses(scw1, []resolver.Address{
828 {Addr: "address2"},
829 {Addr: "address3"},
830 })
831
832 sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
833 defer cancel()
834 if _, err := scsCh.Receive(sCtx); err == nil {
835 t.Fatalf("no SubConn update should have been sent (no SubConn got ejected/unejected)")
836 }
837
838
839
840 od.UpdateAddresses(scw1, []resolver.Address{{Addr: "address2"}})
841
842 gotSCWS, err = scsCh.Receive(ctx)
843 if err != nil {
844 t.Fatalf("Error waiting for Sub Conn update: %v", err)
845 }
846 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
847 sc: scw1,
848 state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
849 }); err != nil {
850 t.Fatalf("Error in Sub Conn update: %v", err)
851 }
852 }
853
854 func scwsEqual(gotSCWS subConnWithState, wantSCWS subConnWithState) error {
855 if gotSCWS.sc != wantSCWS.sc || !cmp.Equal(gotSCWS.state, wantSCWS.state, cmp.AllowUnexported(subConnWrapper{}, addressInfo{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) {
856 return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCWS, wantSCWS)
857 }
858 return nil
859 }
860
861 type rrPicker struct {
862 scs []balancer.SubConn
863 next int
864 }
865
866 func (rrp *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
867 sc := rrp.scs[rrp.next]
868 rrp.next = (rrp.next + 1) % len(rrp.scs)
869 return balancer.PickResult{SubConn: sc}, nil
870 }
871
872
873
874
875
876
877
878
879
880
881 func (s) TestDurationOfInterval(t *testing.T) {
882 stub.Register(t.Name(), stub.BalancerFuncs{})
883
884 od, _, cleanup := setup(t)
885 defer func(af func(d time.Duration, f func()) *time.Timer) {
886 cleanup()
887 afterFunc = af
888 }(afterFunc)
889
890 durationChan := testutils.NewChannel()
891 afterFunc = func(dur time.Duration, _ func()) *time.Timer {
892 durationChan.Send(dur)
893 return time.NewTimer(math.MaxInt64)
894 }
895
896 od.UpdateClientConnState(balancer.ClientConnState{
897 BalancerConfig: &LBConfig{
898 Interval: iserviceconfig.Duration(8 * time.Second),
899 SuccessRateEjection: &SuccessRateEjection{
900 StdevFactor: 1900,
901 EnforcementPercentage: 100,
902 MinimumHosts: 5,
903 RequestVolume: 100,
904 },
905 ChildPolicy: &iserviceconfig.BalancerConfig{
906 Name: t.Name(),
907 Config: emptyChildConfig{},
908 },
909 },
910 })
911 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
912 defer cancel()
913 d, err := durationChan.Receive(ctx)
914 if err != nil {
915 t.Fatalf("Error receiving duration from afterFunc() call: %v", err)
916 }
917 dur := d.(time.Duration)
918
919
920 if dur != 8*time.Second {
921 t.Fatalf("configured duration should have been 8 seconds to start timer")
922 }
923
924
925
926 defer func(n func() time.Time) {
927 now = n
928 }(now)
929 now = func() time.Time {
930 return time.Now().Add(time.Second * 5)
931 }
932
933
934
935
936 od.UpdateClientConnState(balancer.ClientConnState{
937 BalancerConfig: &LBConfig{
938 Interval: iserviceconfig.Duration(9 * time.Second),
939 SuccessRateEjection: &SuccessRateEjection{
940 StdevFactor: 1900,
941 EnforcementPercentage: 100,
942 MinimumHosts: 5,
943 RequestVolume: 100,
944 },
945 ChildPolicy: &iserviceconfig.BalancerConfig{
946 Name: t.Name(),
947 Config: emptyChildConfig{},
948 },
949 },
950 })
951
952 d, err = durationChan.Receive(ctx)
953 if err != nil {
954 t.Fatalf("Error receiving duration from afterFunc() call: %v", err)
955 }
956 dur = d.(time.Duration)
957 if dur.Seconds() < 3.5 || 4.5 < dur.Seconds() {
958 t.Fatalf("configured duration should have been around 4 seconds to start timer")
959 }
960
961
962
963 od.UpdateClientConnState(balancer.ClientConnState{
964 BalancerConfig: &LBConfig{
965 Interval: iserviceconfig.Duration(10 * time.Second),
966 ChildPolicy: &iserviceconfig.BalancerConfig{
967 Name: t.Name(),
968 Config: emptyChildConfig{},
969 },
970 },
971 })
972
973
974 sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
975 defer cancel()
976 if _, err = durationChan.Receive(sCtx); err == nil {
977 t.Fatal("No timer should have started.")
978 }
979 }
980
981
982
983
984
985
986
987
988
989
990
991 func (s) TestEjectUnejectSuccessRate(t *testing.T) {
992 scsCh := testutils.NewChannel()
993 var scw1, scw2, scw3 balancer.SubConn
994 var err error
995 stub.Register(t.Name(), stub.BalancerFuncs{
996 UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
997 scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{
998 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw1, state: state}) },
999 })
1000 if err != nil {
1001 t.Errorf("error in od.NewSubConn call: %v", err)
1002 }
1003 scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{
1004 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw2, state: state}) },
1005 })
1006 if err != nil {
1007 t.Errorf("error in od.NewSubConn call: %v", err)
1008 }
1009 scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{
1010 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw3, state: state}) },
1011 })
1012 if err != nil {
1013 t.Errorf("error in od.NewSubConn call: %v", err)
1014 }
1015 bd.ClientConn.UpdateState(balancer.State{
1016 ConnectivityState: connectivity.Ready,
1017 Picker: &rrPicker{
1018 scs: []balancer.SubConn{scw1, scw2, scw3},
1019 },
1020 })
1021 return nil
1022 },
1023 })
1024
1025 od, tcc, cleanup := setup(t)
1026 defer func() {
1027 cleanup()
1028 }()
1029
1030 od.UpdateClientConnState(balancer.ClientConnState{
1031 ResolverState: resolver.State{
1032 Addresses: []resolver.Address{
1033 {Addr: "address1"},
1034 {Addr: "address2"},
1035 {Addr: "address3"},
1036 },
1037 },
1038 BalancerConfig: &LBConfig{
1039 Interval: math.MaxInt64,
1040 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
1041 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
1042 MaxEjectionPercent: 10,
1043 FailurePercentageEjection: &FailurePercentageEjection{
1044 Threshold: 50,
1045 EnforcementPercentage: 100,
1046 MinimumHosts: 3,
1047 RequestVolume: 3,
1048 },
1049 ChildPolicy: &iserviceconfig.BalancerConfig{
1050 Name: t.Name(),
1051 Config: emptyChildConfig{},
1052 },
1053 },
1054 })
1055
1056 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1057 defer cancel()
1058
1059 select {
1060 case <-ctx.Done():
1061 t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
1062 case picker := <-tcc.NewPickerCh:
1063
1064
1065
1066 for i := 0; i < 3; i++ {
1067 pi, err := picker.Pick(balancer.PickInfo{})
1068 if err != nil {
1069 t.Fatalf("picker.Pick failed with error: %v", err)
1070 }
1071 for c := 0; c < 5; c++ {
1072 pi.Done(balancer.DoneInfo{})
1073 }
1074 }
1075
1076 od.intervalTimerAlgorithm()
1077
1078
1079
1080 sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
1081 defer cancel()
1082 if _, err := scsCh.Receive(sCtx); err == nil {
1083 t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)")
1084 }
1085
1086
1087
1088 od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
1089 ConnectivityState: connectivity.Connecting,
1090 })
1091
1092 gotSCWS, err := scsCh.Receive(ctx)
1093 if err != nil {
1094 t.Fatalf("Error waiting for Sub Conn update: %v", err)
1095 }
1096 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
1097 sc: scw1,
1098 state: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
1099 }); err != nil {
1100 t.Fatalf("Error in Sub Conn update: %v", err)
1101 }
1102
1103
1104
1105
1106
1107 for i := 0; i < 2; i++ {
1108 pi, err := picker.Pick(balancer.PickInfo{})
1109 if err != nil {
1110 t.Fatalf("picker.Pick failed with error: %v", err)
1111 }
1112 for c := 0; c < 5; c++ {
1113 pi.Done(balancer.DoneInfo{})
1114 }
1115 }
1116 pi, err := picker.Pick(balancer.PickInfo{})
1117 if err != nil {
1118 t.Fatalf("picker.Pick failed with error: %v", err)
1119 }
1120 for c := 0; c < 5; c++ {
1121 pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
1122 }
1123
1124
1125 od.intervalTimerAlgorithm()
1126
1127
1128
1129 gotSCWS, err = scsCh.Receive(ctx)
1130 if err != nil {
1131 t.Fatalf("Error waiting for Sub Conn update: %v", err)
1132 }
1133 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
1134 sc: scw3,
1135 state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
1136 }); err != nil {
1137 t.Fatalf("Error in Sub Conn update: %v", err)
1138 }
1139
1140 sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
1141 defer cancel()
1142 if _, err := scsCh.Receive(sCtx); err == nil {
1143 t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)")
1144 }
1145
1146
1147
1148
1149
1150 od.updateSubConnState(pi.SubConn, balancer.SubConnState{
1151 ConnectivityState: connectivity.Connecting,
1152 })
1153 sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
1154 defer cancel()
1155 if _, err := scsCh.Receive(sCtx); err == nil {
1156 t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)")
1157 }
1158
1159
1160
1161
1162
1163
1164 defer func(n func() time.Time) {
1165 now = n
1166 }(now)
1167 now = func() time.Time {
1168 return time.Now().Add(time.Second * 1000)
1169 }
1170 od.intervalTimerAlgorithm()
1171
1172
1173
1174 gotSCWS, err = scsCh.Receive(ctx)
1175 if err != nil {
1176 t.Fatalf("Error waiting for Sub Conn update: %v", err)
1177 }
1178 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
1179 sc: scw3,
1180 state: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
1181 }); err != nil {
1182 t.Fatalf("Error in Sub Conn update: %v", err)
1183 }
1184 }
1185 }
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198 func (s) TestEjectFailureRate(t *testing.T) {
1199 scsCh := testutils.NewChannel()
1200 var scw1, scw2, scw3 balancer.SubConn
1201 var err error
1202 stub.Register(t.Name(), stub.BalancerFuncs{
1203 UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
1204 if scw1 != nil {
1205 return nil
1206 }
1207 scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{
1208 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw1, state: state}) },
1209 })
1210 if err != nil {
1211 t.Errorf("error in od.NewSubConn call: %v", err)
1212 }
1213 scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{
1214 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw2, state: state}) },
1215 })
1216 if err != nil {
1217 t.Errorf("error in od.NewSubConn call: %v", err)
1218 }
1219 scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{
1220 StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw3, state: state}) },
1221 })
1222 if err != nil {
1223 t.Errorf("error in od.NewSubConn call: %v", err)
1224 }
1225 return nil
1226 },
1227 })
1228
1229 od, tcc, cleanup := setup(t)
1230 defer func() {
1231 cleanup()
1232 }()
1233
1234 od.UpdateClientConnState(balancer.ClientConnState{
1235 ResolverState: resolver.State{
1236 Addresses: []resolver.Address{
1237 {Addr: "address1"},
1238 {Addr: "address2"},
1239 {Addr: "address3"},
1240 },
1241 },
1242 BalancerConfig: &LBConfig{
1243 Interval: math.MaxInt64,
1244 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
1245 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
1246 MaxEjectionPercent: 10,
1247 SuccessRateEjection: &SuccessRateEjection{
1248 StdevFactor: 500,
1249 EnforcementPercentage: 100,
1250 MinimumHosts: 3,
1251 RequestVolume: 3,
1252 },
1253 ChildPolicy: &iserviceconfig.BalancerConfig{
1254 Name: t.Name(),
1255 Config: emptyChildConfig{},
1256 },
1257 },
1258 })
1259
1260 od.UpdateState(balancer.State{
1261 ConnectivityState: connectivity.Ready,
1262 Picker: &rrPicker{
1263 scs: []balancer.SubConn{scw1, scw2, scw3},
1264 },
1265 })
1266
1267 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1268 defer cancel()
1269
1270 select {
1271 case <-ctx.Done():
1272 t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
1273 case picker := <-tcc.NewPickerCh:
1274
1275
1276
1277 for i := 0; i < 3; i++ {
1278 pi, err := picker.Pick(balancer.PickInfo{})
1279 if err != nil {
1280 t.Fatalf("picker.Pick failed with error: %v", err)
1281 }
1282 for c := 0; c < 5; c++ {
1283 pi.Done(balancer.DoneInfo{})
1284 }
1285 }
1286
1287 od.intervalTimerAlgorithm()
1288 sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
1289 defer cancel()
1290 if _, err := scsCh.Receive(sCtx); err == nil {
1291 t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)")
1292 }
1293
1294
1295
1296
1297
1298 for i := 0; i < 2; i++ {
1299 pi, err := picker.Pick(balancer.PickInfo{})
1300 if err != nil {
1301 t.Fatalf("picker.Pick failed with error: %v", err)
1302 }
1303 for c := 0; c < 5; c++ {
1304 pi.Done(balancer.DoneInfo{})
1305 }
1306 }
1307 pi, err := picker.Pick(balancer.PickInfo{})
1308 if err != nil {
1309 t.Fatalf("picker.Pick failed with error: %v", err)
1310 }
1311 for c := 0; c < 5; c++ {
1312 pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
1313 }
1314
1315
1316 od.intervalTimerAlgorithm()
1317
1318
1319
1320 gotSCWS, err := scsCh.Receive(ctx)
1321 if err != nil {
1322 t.Fatalf("Error waiting for Sub Conn update: %v", err)
1323 }
1324 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
1325 sc: scw3,
1326 state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
1327 }); err != nil {
1328 t.Fatalf("Error in Sub Conn update: %v", err)
1329 }
1330
1331
1332 sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
1333 defer cancel()
1334 if _, err := scsCh.Receive(sCtx); err == nil {
1335 t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)")
1336 }
1337
1338
1339
1340 od.UpdateClientConnState(balancer.ClientConnState{
1341 ResolverState: resolver.State{
1342 Addresses: []resolver.Address{
1343 {Addr: "address1"},
1344 {Addr: "address2"},
1345 {Addr: "address3"},
1346 },
1347 },
1348 BalancerConfig: &LBConfig{
1349 Interval: math.MaxInt64,
1350 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
1351 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
1352 MaxEjectionPercent: 10,
1353 ChildPolicy: &iserviceconfig.BalancerConfig{
1354 Name: t.Name(),
1355 Config: emptyChildConfig{},
1356 },
1357 },
1358 })
1359 gotSCWS, err = scsCh.Receive(ctx)
1360 if err != nil {
1361 t.Fatalf("Error waiting for Sub Conn update: %v", err)
1362 }
1363 if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
1364 sc: scw3,
1365 state: balancer.SubConnState{ConnectivityState: connectivity.Idle},
1366 }); err != nil {
1367 t.Fatalf("Error in Sub Conn update: %v", err)
1368 }
1369 }
1370 }
1371
1372
1373
1374
1375
1376 func (s) TestConcurrentOperations(t *testing.T) {
1377 closed := grpcsync.NewEvent()
1378 stub.Register(t.Name(), stub.BalancerFuncs{
1379 UpdateClientConnState: func(*stub.BalancerData, balancer.ClientConnState) error {
1380 if closed.HasFired() {
1381 t.Error("UpdateClientConnState was called after Close(), which breaks the balancer API")
1382 }
1383 return nil
1384 },
1385 ResolverError: func(*stub.BalancerData, error) {
1386 if closed.HasFired() {
1387 t.Error("ResolverError was called after Close(), which breaks the balancer API")
1388 }
1389 },
1390 Close: func(*stub.BalancerData) {
1391 closed.Fire()
1392 },
1393 ExitIdle: func(*stub.BalancerData) {
1394 if closed.HasFired() {
1395 t.Error("ExitIdle was called after Close(), which breaks the balancer API")
1396 }
1397 },
1398 })
1399
1400 od, tcc, cleanup := setup(t)
1401 defer func() {
1402 cleanup()
1403 }()
1404
1405 od.UpdateClientConnState(balancer.ClientConnState{
1406 ResolverState: resolver.State{
1407 Addresses: []resolver.Address{
1408 {Addr: "address1"},
1409 {Addr: "address2"},
1410 {Addr: "address3"},
1411 },
1412 },
1413 BalancerConfig: &LBConfig{
1414 Interval: math.MaxInt64,
1415 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
1416 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
1417 MaxEjectionPercent: 10,
1418 SuccessRateEjection: &SuccessRateEjection{
1419 StdevFactor: 500,
1420 EnforcementPercentage: 100,
1421 MinimumHosts: 3,
1422 RequestVolume: 3,
1423 },
1424 FailurePercentageEjection: &FailurePercentageEjection{
1425 Threshold: 50,
1426 EnforcementPercentage: 100,
1427 MinimumHosts: 3,
1428 RequestVolume: 3,
1429 },
1430 ChildPolicy: &iserviceconfig.BalancerConfig{
1431 Name: t.Name(),
1432 Config: emptyChildConfig{},
1433 },
1434 },
1435 })
1436 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1437 defer cancel()
1438
1439 scw1, err := od.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
1440 if err != nil {
1441 t.Fatalf("error in od.NewSubConn call: %v", err)
1442 }
1443 if err != nil {
1444 t.Fatalf("error in od.NewSubConn call: %v", err)
1445 }
1446
1447 scw2, err := od.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{})
1448 if err != nil {
1449 t.Fatalf("error in od.NewSubConn call: %v", err)
1450 }
1451
1452 scw3, err := od.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{})
1453 if err != nil {
1454 t.Fatalf("error in od.NewSubConn call: %v", err)
1455 }
1456
1457 od.UpdateState(balancer.State{
1458 ConnectivityState: connectivity.Ready,
1459 Picker: &rrPicker{
1460 scs: []balancer.SubConn{scw2, scw3},
1461 },
1462 })
1463
1464 var picker balancer.Picker
1465 select {
1466 case <-ctx.Done():
1467 t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
1468 case picker = <-tcc.NewPickerCh:
1469 }
1470
1471 finished := make(chan struct{})
1472 var wg sync.WaitGroup
1473 wg.Add(1)
1474 go func() {
1475 defer wg.Done()
1476 for {
1477 select {
1478 case <-finished:
1479 return
1480 default:
1481 }
1482 pi, err := picker.Pick(balancer.PickInfo{})
1483 if err != nil {
1484 continue
1485 }
1486 pi.Done(balancer.DoneInfo{})
1487 pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
1488 time.Sleep(1 * time.Nanosecond)
1489 }
1490 }()
1491
1492 wg.Add(1)
1493 go func() {
1494 defer wg.Done()
1495 for {
1496 select {
1497 case <-finished:
1498 return
1499 default:
1500 }
1501 od.intervalTimerAlgorithm()
1502 }
1503 }()
1504
1505
1506
1507
1508 wg.Add(1)
1509 go func() {
1510 defer wg.Done()
1511 for {
1512 select {
1513 case <-finished:
1514 return
1515 default:
1516 }
1517 od.UpdateState(balancer.State{
1518 ConnectivityState: connectivity.Ready,
1519 Picker: &rrPicker{
1520 scs: []balancer.SubConn{scw2, scw3},
1521 },
1522 })
1523 time.Sleep(1 * time.Nanosecond)
1524 }
1525 }()
1526
1527 wg.Add(1)
1528 go func() {
1529 defer wg.Done()
1530 od.NewSubConn([]resolver.Address{{Addr: "address4"}}, balancer.NewSubConnOptions{})
1531 }()
1532
1533 wg.Add(1)
1534 go func() {
1535 defer wg.Done()
1536 scw1.Shutdown()
1537 }()
1538
1539 wg.Add(1)
1540 go func() {
1541 defer wg.Done()
1542 od.UpdateAddresses(scw2, []resolver.Address{{Addr: "address3"}})
1543 }()
1544
1545
1546
1547 od.UpdateClientConnState(balancer.ClientConnState{
1548 ResolverState: resolver.State{
1549 Addresses: []resolver.Address{{Addr: "address1"}},
1550 },
1551 BalancerConfig: &LBConfig{
1552 Interval: math.MaxInt64,
1553 ChildPolicy: &iserviceconfig.BalancerConfig{
1554 Name: t.Name(),
1555 Config: emptyChildConfig{},
1556 },
1557 },
1558 })
1559
1560
1561
1562 od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
1563 ConnectivityState: connectivity.Connecting,
1564 })
1565 od.ResolverError(errors.New("some error"))
1566 od.ExitIdle()
1567 od.Close()
1568 close(finished)
1569 wg.Wait()
1570 }
1571
View as plain text