1
18
19 package xdsclient
20
21 import (
22 "context"
23 "testing"
24 "time"
25
26 "github.com/google/go-cmp/cmp"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/internal/testutils/xds/fakeserver"
29 "google.golang.org/grpc/internal/xds/bootstrap"
30 "google.golang.org/grpc/status"
31 xdstestutils "google.golang.org/grpc/xds/internal/testutils"
32 "google.golang.org/protobuf/testing/protocmp"
33
34 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
35 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
36 v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
37 "google.golang.org/protobuf/types/known/durationpb"
38 )
39
40 const (
41 defaultClientWatchExpiryTimeout = 15 * time.Second
42 )
43
44 func (s) TestLRSClient(t *testing.T) {
45 fs1, sCleanup, err := fakeserver.StartServer(nil)
46 if err != nil {
47 t.Fatalf("failed to start fake xDS server: %v", err)
48 }
49 defer sCleanup()
50
51 serverCfg1 := xdstestutils.ServerConfigForAddress(t, fs1.Address)
52 xdsC, close, err := NewWithConfigForTesting(&bootstrap.Config{
53 XDSServer: serverCfg1,
54 NodeProto: &v3corepb.Node{},
55 }, defaultClientWatchExpiryTimeout, time.Duration(0))
56 if err != nil {
57 t.Fatalf("failed to create xds client: %v", err)
58 }
59 defer close()
60
61
62 store1, lrsCancel1 := xdsC.ReportLoad(serverCfg1)
63 defer lrsCancel1()
64
65 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
66 defer cancel()
67 if u, err := fs1.NewConnChan.Receive(ctx); err != nil {
68 t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
69 }
70
71 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
72 defer sCancel()
73 if u, err := fs1.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded {
74 t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err)
75 }
76
77 fs2, sCleanup2, err := fakeserver.StartServer(nil)
78 if err != nil {
79 t.Fatalf("failed to start fake xDS server: %v", err)
80 }
81 defer sCleanup2()
82
83
84 serverCgf2 := xdstestutils.ServerConfigForAddress(t, fs2.Address)
85 store2, lrsCancel2 := xdsC.ReportLoad(serverCgf2)
86 defer lrsCancel2()
87 if u, err := fs2.NewConnChan.Receive(ctx); err != nil {
88 t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
89 }
90
91 if store1 == store2 {
92 t.Fatalf("got same store for different servers, want different")
93 }
94
95 if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil {
96 t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
97 }
98 store2.PerCluster("cluster", "eds").CallDropped("test")
99
100
101 fs2.LRSResponseChan <- &fakeserver.Response{
102 Resp: &v3lrspb.LoadStatsResponse{
103 SendAllClusters: true,
104 LoadReportingInterval: &durationpb.Duration{Nanos: 50000000},
105 },
106 }
107
108
109 u, err := fs2.LRSRequestChan.Receive(ctx)
110 if err != nil {
111 t.Fatalf("unexpected LRS request: %v, %v, want error canceled", u, err)
112 }
113 receivedLoad := u.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest).ClusterStats
114 if len(receivedLoad) <= 0 {
115 t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test")
116 }
117 receivedLoad[0].LoadReportInterval = nil
118 want := &v3endpointpb.ClusterStats{
119 ClusterName: "cluster",
120 ClusterServiceName: "eds",
121 TotalDroppedRequests: 1,
122 DroppedRequests: []*v3endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}},
123 }
124 if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" {
125 t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d)
126 }
127
128
129 lrsCancel2()
130
131
132 if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled {
133 t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err)
134 }
135 }
136
View as plain text