1
18
19 package clusterimpl
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "strings"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30 "github.com/google/go-cmp/cmp/cmpopts"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/balancer/base"
33 "google.golang.org/grpc/balancer/roundrobin"
34 "google.golang.org/grpc/connectivity"
35 "google.golang.org/grpc/internal/balancer/stub"
36 "google.golang.org/grpc/internal/grpctest"
37 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
38 "google.golang.org/grpc/internal/testutils"
39 "google.golang.org/grpc/internal/xds"
40 "google.golang.org/grpc/internal/xds/bootstrap"
41 "google.golang.org/grpc/resolver"
42 xdsinternal "google.golang.org/grpc/xds/internal"
43 "google.golang.org/grpc/xds/internal/testutils/fakeclient"
44 "google.golang.org/grpc/xds/internal/xdsclient"
45 "google.golang.org/grpc/xds/internal/xdsclient/load"
46
47 v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
48 )
49
50 const (
51 defaultTestTimeout = 5 * time.Second
52 defaultShortTestTimeout = 100 * time.Microsecond
53
54 testClusterName = "test-cluster"
55 testServiceName = "test-eds-service"
56
57 testNamedMetricsKey1 = "test-named1"
58 testNamedMetricsKey2 = "test-named2"
59 )
60
61 var (
62 testBackendAddrs = []resolver.Address{
63 {Addr: "1.1.1.1:1"},
64 }
65 testLRSServerConfig = &bootstrap.ServerConfig{
66 ServerURI: "trafficdirector.googleapis.com:443",
67 Creds: bootstrap.ChannelCreds{
68 Type: "google_default",
69 },
70 }
71
72 cmpOpts = cmp.Options{
73 cmpopts.EquateEmpty(),
74 cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
75 }
76 toleranceCmpOpt = cmpopts.EquateApprox(0, 1e-5)
77 )
78
79 type s struct {
80 grpctest.Tester
81 }
82
83 func Test(t *testing.T) {
84 grpctest.RunSubTests(t, s{})
85 }
86
87 func init() {
88 NewRandomWRR = testutils.NewTestWRR
89 }
90
91
92
93 func (s) TestDropByCategory(t *testing.T) {
94 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
95 defer cancel()
96
97 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
98 xdsC := fakeclient.NewClient()
99
100 builder := balancer.Get(Name)
101 cc := testutils.NewBalancerClientConn(t)
102 b := builder.Build(cc, balancer.BuildOptions{})
103 defer b.Close()
104
105 const (
106 dropReason = "test-dropping-category"
107 dropNumerator = 1
108 dropDenominator = 2
109 )
110 if err := b.UpdateClientConnState(balancer.ClientConnState{
111 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
112 BalancerConfig: &LBConfig{
113 Cluster: testClusterName,
114 EDSServiceName: testServiceName,
115 LoadReportingServer: testLRSServerConfig,
116 DropCategories: []DropConfig{{
117 Category: dropReason,
118 RequestsPerMillion: million * dropNumerator / dropDenominator,
119 }},
120 ChildPolicy: &internalserviceconfig.BalancerConfig{
121 Name: roundrobin.Name,
122 },
123 },
124 }); err != nil {
125 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
126 }
127
128 got, err := xdsC.WaitForReportLoad(ctx)
129 if err != nil {
130 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
131 }
132 if got.Server != testLRSServerConfig {
133 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
134 }
135
136 sc1 := <-cc.NewSubConnCh
137 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
138
139 if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
140 t.Fatal(err.Error())
141 }
142
143 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
144
145
146 const rpcCount = 20
147 if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
148 for i := 0; i < rpcCount; i++ {
149 gotSCSt, err := p.Pick(balancer.PickInfo{})
150
151 if i%2 == 0 {
152 if err == nil || !strings.Contains(err.Error(), "dropped") {
153 return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
154 }
155 continue
156 }
157 if err != nil || gotSCSt.SubConn != sc1 {
158 return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
159 }
160 if gotSCSt.Done != nil {
161 gotSCSt.Done(balancer.DoneInfo{})
162 }
163 }
164 return nil
165 }); err != nil {
166 t.Fatal(err.Error())
167 }
168
169
170 loadStore := xdsC.LoadStore()
171 if loadStore == nil {
172 t.Fatal("loadStore is nil in xdsClient")
173 }
174 const dropCount = rpcCount * dropNumerator / dropDenominator
175 wantStatsData0 := []*load.Data{{
176 Cluster: testClusterName,
177 Service: testServiceName,
178 TotalDrops: dropCount,
179 Drops: map[string]uint64{dropReason: dropCount},
180 LocalityStats: map[string]load.LocalityData{
181 assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}},
182 },
183 }}
184
185 gotStatsData0 := loadStore.Stats([]string{testClusterName})
186 if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
187 t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
188 }
189
190
191 const (
192 dropReason2 = "test-dropping-category-2"
193 dropNumerator2 = 1
194 dropDenominator2 = 4
195 )
196 if err := b.UpdateClientConnState(balancer.ClientConnState{
197 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
198 BalancerConfig: &LBConfig{
199 Cluster: testClusterName,
200 EDSServiceName: testServiceName,
201 LoadReportingServer: testLRSServerConfig,
202 DropCategories: []DropConfig{{
203 Category: dropReason2,
204 RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
205 }},
206 ChildPolicy: &internalserviceconfig.BalancerConfig{
207 Name: roundrobin.Name,
208 },
209 },
210 }); err != nil {
211 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
212 }
213
214 if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
215 for i := 0; i < rpcCount; i++ {
216 gotSCSt, err := p.Pick(balancer.PickInfo{})
217
218 if i%4 == 0 {
219 if err == nil || !strings.Contains(err.Error(), "dropped") {
220 return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
221 }
222 continue
223 }
224 if err != nil || gotSCSt.SubConn != sc1 {
225 return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
226 }
227 if gotSCSt.Done != nil {
228 gotSCSt.Done(balancer.DoneInfo{})
229 }
230 }
231 return nil
232 }); err != nil {
233 t.Fatal(err.Error())
234 }
235
236 const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
237 wantStatsData1 := []*load.Data{{
238 Cluster: testClusterName,
239 Service: testServiceName,
240 TotalDrops: dropCount2,
241 Drops: map[string]uint64{dropReason2: dropCount2},
242 LocalityStats: map[string]load.LocalityData{
243 assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}},
244 },
245 }}
246
247 gotStatsData1 := loadStore.Stats([]string{testClusterName})
248 if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
249 t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
250 }
251 }
252
253
254
255 func (s) TestDropCircuitBreaking(t *testing.T) {
256 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
257 xdsC := fakeclient.NewClient()
258
259 builder := balancer.Get(Name)
260 cc := testutils.NewBalancerClientConn(t)
261 b := builder.Build(cc, balancer.BuildOptions{})
262 defer b.Close()
263
264 var maxRequest uint32 = 50
265 if err := b.UpdateClientConnState(balancer.ClientConnState{
266 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
267 BalancerConfig: &LBConfig{
268 Cluster: testClusterName,
269 EDSServiceName: testServiceName,
270 LoadReportingServer: testLRSServerConfig,
271 MaxConcurrentRequests: &maxRequest,
272 ChildPolicy: &internalserviceconfig.BalancerConfig{
273 Name: roundrobin.Name,
274 },
275 },
276 }); err != nil {
277 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
278 }
279
280 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
281 defer cancel()
282
283 got, err := xdsC.WaitForReportLoad(ctx)
284 if err != nil {
285 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
286 }
287 if got.Server != testLRSServerConfig {
288 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
289 }
290
291 sc1 := <-cc.NewSubConnCh
292 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
293
294 if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
295 t.Fatal(err.Error())
296 }
297
298 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
299
300 const rpcCount = 100
301 if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
302 dones := []func(){}
303 for i := 0; i < rpcCount; i++ {
304 gotSCSt, err := p.Pick(balancer.PickInfo{})
305 if i < 50 && err != nil {
306 return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err)
307 } else if i > 50 && err == nil {
308 return fmt.Errorf("The second 50%% picks should be drops, got error <nil>")
309 }
310 dones = append(dones, func() {
311 if gotSCSt.Done != nil {
312 gotSCSt.Done(balancer.DoneInfo{})
313 }
314 })
315 }
316 for _, done := range dones {
317 done()
318 }
319
320 dones = []func(){}
321
322 for i := 0; i < 50; i++ {
323 gotSCSt, err := p.Pick(balancer.PickInfo{})
324 if err != nil {
325 t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
326 }
327 dones = append(dones, func() {
328 if gotSCSt.Done != nil {
329 gotSCSt.Done(balancer.DoneInfo{})
330 }
331 })
332 }
333 for _, done := range dones {
334 done()
335 }
336
337 return nil
338 }); err != nil {
339 t.Fatal(err.Error())
340 }
341
342
343 loadStore := xdsC.LoadStore()
344 if loadStore == nil {
345 t.Fatal("loadStore is nil in xdsClient")
346 }
347
348 wantStatsData0 := []*load.Data{{
349 Cluster: testClusterName,
350 Service: testServiceName,
351 TotalDrops: uint64(maxRequest),
352 LocalityStats: map[string]load.LocalityData{
353 assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}},
354 },
355 }}
356
357 gotStatsData0 := loadStore.Stats([]string{testClusterName})
358 if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
359 t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff)
360 }
361 }
362
363
364
365
366
367 func (s) TestPickerUpdateAfterClose(t *testing.T) {
368 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
369 xdsC := fakeclient.NewClient()
370
371 builder := balancer.Get(Name)
372 cc := testutils.NewBalancerClientConn(t)
373 b := builder.Build(cc, balancer.BuildOptions{})
374
375
376
377
378 closeCh := make(chan struct{})
379 const childPolicyName = "stubBalancer-TestPickerUpdateAfterClose"
380 stub.Register(childPolicyName, stub.BalancerFuncs{
381 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
382
383
384 sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{
385 StateListener: func(balancer.SubConnState) {
386 go func() {
387
388
389 <-closeCh
390 bd.ClientConn.UpdateState(balancer.State{
391 Picker: base.NewErrPicker(errors.New("dummy error picker")),
392 })
393 }()
394 },
395 })
396 if err != nil {
397 return err
398 }
399 sc.Connect()
400 return nil
401 },
402 })
403
404 var maxRequest uint32 = 50
405 if err := b.UpdateClientConnState(balancer.ClientConnState{
406 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
407 BalancerConfig: &LBConfig{
408 Cluster: testClusterName,
409 EDSServiceName: testServiceName,
410 MaxConcurrentRequests: &maxRequest,
411 ChildPolicy: &internalserviceconfig.BalancerConfig{
412 Name: childPolicyName,
413 },
414 },
415 }); err != nil {
416 b.Close()
417 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
418 }
419
420
421
422
423 sc1 := <-cc.NewSubConnCh
424 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
425 b.Close()
426 close(closeCh)
427
428 select {
429 case <-cc.NewPickerCh:
430 t.Fatalf("unexpected picker update after balancer is closed")
431 case <-time.After(defaultShortTestTimeout):
432 }
433 }
434
435
436
437 func (s) TestClusterNameInAddressAttributes(t *testing.T) {
438 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
439 defer cancel()
440
441 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
442 xdsC := fakeclient.NewClient()
443
444 builder := balancer.Get(Name)
445 cc := testutils.NewBalancerClientConn(t)
446 b := builder.Build(cc, balancer.BuildOptions{})
447 defer b.Close()
448
449 if err := b.UpdateClientConnState(balancer.ClientConnState{
450 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
451 BalancerConfig: &LBConfig{
452 Cluster: testClusterName,
453 EDSServiceName: testServiceName,
454 ChildPolicy: &internalserviceconfig.BalancerConfig{
455 Name: roundrobin.Name,
456 },
457 },
458 }); err != nil {
459 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
460 }
461
462 sc1 := <-cc.NewSubConnCh
463 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
464
465 if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
466 t.Fatal(err.Error())
467 }
468
469 addrs1 := <-cc.NewSubConnAddrsCh
470 if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want {
471 t.Fatalf("sc is created with addr %v, want %v", got, want)
472 }
473 cn, ok := xds.GetXDSHandshakeClusterName(addrs1[0].Attributes)
474 if !ok || cn != testClusterName {
475 t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
476 }
477
478 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
479
480 if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
481 t.Fatal(err.Error())
482 }
483
484 const testClusterName2 = "test-cluster-2"
485 var addr2 = resolver.Address{Addr: "2.2.2.2"}
486 if err := b.UpdateClientConnState(balancer.ClientConnState{
487 ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC),
488 BalancerConfig: &LBConfig{
489 Cluster: testClusterName2,
490 EDSServiceName: testServiceName,
491 ChildPolicy: &internalserviceconfig.BalancerConfig{
492 Name: roundrobin.Name,
493 },
494 },
495 }); err != nil {
496 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
497 }
498
499 addrs2 := <-cc.NewSubConnAddrsCh
500 if got, want := addrs2[0].Addr, addr2.Addr; got != want {
501 t.Fatalf("sc is created with addr %v, want %v", got, want)
502 }
503
504 cn2, ok := xds.GetXDSHandshakeClusterName(addrs2[0].Attributes)
505 if !ok || cn2 != testClusterName2 {
506 t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn2, ok, testClusterName2)
507 }
508 }
509
510
511
512 func (s) TestReResolution(t *testing.T) {
513 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
514 defer cancel()
515
516 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
517 xdsC := fakeclient.NewClient()
518
519 builder := balancer.Get(Name)
520 cc := testutils.NewBalancerClientConn(t)
521 b := builder.Build(cc, balancer.BuildOptions{})
522 defer b.Close()
523
524 if err := b.UpdateClientConnState(balancer.ClientConnState{
525 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
526 BalancerConfig: &LBConfig{
527 Cluster: testClusterName,
528 EDSServiceName: testServiceName,
529 ChildPolicy: &internalserviceconfig.BalancerConfig{
530 Name: roundrobin.Name,
531 },
532 },
533 }); err != nil {
534 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
535 }
536
537 sc1 := <-cc.NewSubConnCh
538 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
539
540 if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
541 t.Fatal(err.Error())
542 }
543
544 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
545
546 if err := cc.WaitForErrPicker(ctx); err != nil {
547 t.Fatal(err.Error())
548 }
549
550
551 select {
552 case <-cc.ResolveNowCh:
553 case <-time.After(defaultTestTimeout):
554 t.Fatalf("timeout waiting for ResolveNow()")
555 }
556
557 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
558
559 if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
560 t.Fatal(err.Error())
561 }
562
563 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
564
565 if err := cc.WaitForErrPicker(ctx); err != nil {
566 t.Fatal(err.Error())
567 }
568
569
570 select {
571 case <-cc.ResolveNowCh:
572 case <-time.After(defaultTestTimeout):
573 t.Fatalf("timeout waiting for ResolveNow()")
574 }
575 }
576
577 func (s) TestLoadReporting(t *testing.T) {
578 var testLocality = xdsinternal.LocalityID{
579 Region: "test-region",
580 Zone: "test-zone",
581 SubZone: "test-sub-zone",
582 }
583
584 xdsC := fakeclient.NewClient()
585
586 builder := balancer.Get(Name)
587 cc := testutils.NewBalancerClientConn(t)
588 b := builder.Build(cc, balancer.BuildOptions{})
589 defer b.Close()
590
591 addrs := make([]resolver.Address, len(testBackendAddrs))
592 for i, a := range testBackendAddrs {
593 addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
594 }
595 if err := b.UpdateClientConnState(balancer.ClientConnState{
596 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
597 BalancerConfig: &LBConfig{
598 Cluster: testClusterName,
599 EDSServiceName: testServiceName,
600 LoadReportingServer: testLRSServerConfig,
601
602 ChildPolicy: &internalserviceconfig.BalancerConfig{
603 Name: roundrobin.Name,
604 },
605 },
606 }); err != nil {
607 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
608 }
609
610 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
611 defer cancel()
612
613 got, err := xdsC.WaitForReportLoad(ctx)
614 if err != nil {
615 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
616 }
617 if got.Server != testLRSServerConfig {
618 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
619 }
620
621 sc1 := <-cc.NewSubConnCh
622 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
623
624 if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
625 t.Fatal(err.Error())
626 }
627
628 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
629
630 const successCount = 5
631 const errorCount = 5
632 if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
633 for i := 0; i < successCount; i++ {
634 gotSCSt, err := p.Pick(balancer.PickInfo{})
635 if gotSCSt.SubConn != sc1 {
636 return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
637 }
638 lr := &v3orcapb.OrcaLoadReport{
639 NamedMetrics: map[string]float64{testNamedMetricsKey1: 3.14, testNamedMetricsKey2: 2.718},
640 }
641 gotSCSt.Done(balancer.DoneInfo{ServerLoad: lr})
642 }
643 for i := 0; i < errorCount; i++ {
644 gotSCSt, err := p.Pick(balancer.PickInfo{})
645 if gotSCSt.SubConn != sc1 {
646 return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
647 }
648 gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
649 }
650 return nil
651 }); err != nil {
652 t.Fatal(err.Error())
653 }
654
655
656 loadStore := xdsC.LoadStore()
657 if loadStore == nil {
658 t.Fatal("loadStore is nil in xdsClient")
659 }
660 sds := loadStore.Stats([]string{testClusterName})
661 if len(sds) == 0 {
662 t.Fatalf("loads for cluster %v not found in store", testClusterName)
663 }
664 sd := sds[0]
665 if sd.Cluster != testClusterName || sd.Service != testServiceName {
666 t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
667 }
668 testLocalityJSON, _ := testLocality.ToString()
669 localityData, ok := sd.LocalityStats[testLocalityJSON]
670 if !ok {
671 t.Fatalf("loads for %v not found in store", testLocality)
672 }
673 reqStats := localityData.RequestStats
674 if reqStats.Succeeded != successCount {
675 t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
676 }
677 if reqStats.Errored != errorCount {
678 t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
679 }
680 if reqStats.InProgress != 0 {
681 t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
682 }
683 wantLoadStats := map[string]load.ServerLoadData{
684 testNamedMetricsKey1: {Count: 5, Sum: 15.7},
685 testNamedMetricsKey2: {Count: 5, Sum: 13.59},
686 }
687 if diff := cmp.Diff(wantLoadStats, localityData.LoadStats, toleranceCmpOpt); diff != "" {
688 t.Errorf("localityData.LoadStats returned unexpected diff (-want +got):\n%s", diff)
689 }
690 b.Close()
691 if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
692 t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
693 }
694 }
695
696
697
698
699
700 func (s) TestUpdateLRSServer(t *testing.T) {
701 var testLocality = xdsinternal.LocalityID{
702 Region: "test-region",
703 Zone: "test-zone",
704 SubZone: "test-sub-zone",
705 }
706
707 xdsC := fakeclient.NewClient()
708
709 builder := balancer.Get(Name)
710 cc := testutils.NewBalancerClientConn(t)
711 b := builder.Build(cc, balancer.BuildOptions{})
712 defer b.Close()
713
714 addrs := make([]resolver.Address, len(testBackendAddrs))
715 for i, a := range testBackendAddrs {
716 addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
717 }
718 if err := b.UpdateClientConnState(balancer.ClientConnState{
719 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
720 BalancerConfig: &LBConfig{
721 Cluster: testClusterName,
722 EDSServiceName: testServiceName,
723 LoadReportingServer: testLRSServerConfig,
724 ChildPolicy: &internalserviceconfig.BalancerConfig{
725 Name: roundrobin.Name,
726 },
727 },
728 }); err != nil {
729 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
730 }
731
732 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
733 defer cancel()
734
735 got, err := xdsC.WaitForReportLoad(ctx)
736 if err != nil {
737 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
738 }
739 if got.Server != testLRSServerConfig {
740 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
741 }
742
743 testLRSServerConfig2 := &bootstrap.ServerConfig{
744 ServerURI: "trafficdirector-another.googleapis.com:443",
745 Creds: bootstrap.ChannelCreds{
746 Type: "google_default",
747 },
748 }
749
750 if err := b.UpdateClientConnState(balancer.ClientConnState{
751 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
752 BalancerConfig: &LBConfig{
753 Cluster: testClusterName,
754 EDSServiceName: testServiceName,
755 LoadReportingServer: testLRSServerConfig2,
756 ChildPolicy: &internalserviceconfig.BalancerConfig{
757 Name: roundrobin.Name,
758 },
759 },
760 }); err != nil {
761 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
762 }
763 if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
764 t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
765 }
766 got2, err2 := xdsC.WaitForReportLoad(ctx)
767 if err2 != nil {
768 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
769 }
770 if got2.Server != testLRSServerConfig2 {
771 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
772 }
773
774
775 if err := b.UpdateClientConnState(balancer.ClientConnState{
776 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
777 BalancerConfig: &LBConfig{
778 Cluster: testClusterName,
779 EDSServiceName: testServiceName,
780 ChildPolicy: &internalserviceconfig.BalancerConfig{
781 Name: roundrobin.Name,
782 },
783 },
784 }); err != nil {
785 t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
786 }
787 if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
788 t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
789 }
790
791 shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
792 defer shortCancel()
793 if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
794 t.Fatalf("unexpected load report to server: %q", s)
795 }
796 }
797
798 func assertString(f func() (string, error)) string {
799 s, err := f()
800 if err != nil {
801 panic(err.Error())
802 }
803 return s
804 }
805
View as plain text