1
18
19 package xds_test
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25 "time"
26
27 "google.golang.org/grpc"
28 _ "google.golang.org/grpc/balancer/leastrequest"
29 _ "google.golang.org/grpc/balancer/weightedroundrobin"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/internal/envconfig"
32 "google.golang.org/grpc/internal/stubserver"
33 "google.golang.org/grpc/internal/testutils"
34 "google.golang.org/grpc/internal/testutils/roundrobin"
35 "google.golang.org/grpc/internal/testutils/xds/e2e"
36 "google.golang.org/grpc/resolver"
37
38 v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
39 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
40 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
41 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
42 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
43 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
44 v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
45 v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
46 v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
47 v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
48 testgrpc "google.golang.org/grpc/interop/grpc_testing"
49 "google.golang.org/protobuf/proto"
50 "google.golang.org/protobuf/types/known/durationpb"
51 "google.golang.org/protobuf/types/known/structpb"
52 "google.golang.org/protobuf/types/known/wrapperspb"
53 )
54
55
56
57
58 func wrrLocality(t *testing.T, m proto.Message) *v3wrrlocalitypb.WrrLocality {
59 return &v3wrrlocalitypb.WrrLocality{
60 EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{
61 Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
62 {
63 TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
64 TypedConfig: testutils.MarshalAny(t, m),
65 },
66 },
67 },
68 },
69 }
70 }
71
72
73
74
75 func clusterWithLBConfiguration(t *testing.T, clusterName, edsServiceName string, secLevel e2e.SecurityLevel, m proto.Message) *v3clusterpb.Cluster {
76 cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel)
77 cluster.LoadBalancingPolicy = &v3clusterpb.LoadBalancingPolicy{
78 Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
79 {
80 TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
81 TypedConfig: testutils.MarshalAny(t, m),
82 },
83 },
84 },
85 }
86 return cluster
87 }
88
89
90
91
92
93
94
95 func (s) TestWrrLocality(t *testing.T) {
96 oldLeastRequestLBSupport := envconfig.LeastRequestLB
97 envconfig.LeastRequestLB = true
98 defer func() {
99 envconfig.LeastRequestLB = oldLeastRequestLBSupport
100 }()
101
102 backend1 := stubserver.StartTestService(t, nil)
103 port1 := testutils.ParsePort(t, backend1.Address)
104 defer backend1.Stop()
105 backend2 := stubserver.StartTestService(t, nil)
106 port2 := testutils.ParsePort(t, backend2.Address)
107 defer backend2.Stop()
108 backend3 := stubserver.StartTestService(t, nil)
109 port3 := testutils.ParsePort(t, backend3.Address)
110 defer backend3.Stop()
111 backend4 := stubserver.StartTestService(t, nil)
112 port4 := testutils.ParsePort(t, backend4.Address)
113 defer backend4.Stop()
114 backend5 := stubserver.StartTestService(t, nil)
115 port5 := testutils.ParsePort(t, backend5.Address)
116 defer backend5.Stop()
117 const serviceName = "my-service-client-side-xds"
118
119 tests := []struct {
120 name string
121
122 wrrLocalityConfiguration *v3wrrlocalitypb.WrrLocality
123 addressDistributionWant []struct {
124 addr string
125 count int
126 }
127 }{
128 {
129 name: "rr_child",
130 wrrLocalityConfiguration: wrrLocality(t, &v3roundrobinpb.RoundRobin{}),
131
132
133
134
135
136
137 addressDistributionWant: []struct {
138 addr string
139 count int
140 }{
141 {addr: backend1.Address, count: 6},
142 {addr: backend2.Address, count: 6},
143 {addr: backend3.Address, count: 8},
144 {addr: backend4.Address, count: 8},
145 {addr: backend5.Address, count: 8},
146 },
147 },
148
149
150
151
152
153 {
154 name: "custom_lb_child_pick_first",
155 wrrLocalityConfiguration: wrrLocality(t, &v3xdsxdstypepb.TypedStruct{
156 TypeUrl: "type.googleapis.com/pick_first",
157 Value: &structpb.Struct{},
158 }),
159 addressDistributionWant: []struct {
160 addr string
161 count int
162 }{
163 {addr: backend1.Address, count: 1},
164 {addr: backend3.Address, count: 2},
165 },
166 },
167
168
169
170
171
172
173
174 {
175 name: "custom_lb_child_wrr/",
176 wrrLocalityConfiguration: wrrLocality(t, &v3clientsideweightedroundrobinpb.ClientSideWeightedRoundRobin{
177 EnableOobLoadReport: &wrapperspb.BoolValue{
178 Value: false,
179 },
180
181
182
183 BlackoutPeriod: durationpb.New(10 * time.Second),
184 WeightExpirationPeriod: durationpb.New(10 * time.Second),
185 WeightUpdatePeriod: durationpb.New(time.Second),
186 ErrorUtilizationPenalty: &wrapperspb.FloatValue{Value: 1},
187 }),
188 addressDistributionWant: []struct {
189 addr string
190 count int
191 }{
192 {addr: backend1.Address, count: 6},
193 {addr: backend2.Address, count: 6},
194 {addr: backend3.Address, count: 8},
195 {addr: backend4.Address, count: 8},
196 {addr: backend5.Address, count: 8},
197 },
198 },
199 {
200 name: "custom_lb_least_request",
201 wrrLocalityConfiguration: wrrLocality(t, &v3leastrequestpb.LeastRequest{
202 ChoiceCount: wrapperspb.UInt32(2),
203 }),
204
205
206
207
208
209
210 addressDistributionWant: []struct {
211 addr string
212 count int
213 }{
214 {addr: backend1.Address, count: 6},
215 {addr: backend2.Address, count: 6},
216 {addr: backend3.Address, count: 8},
217 {addr: backend4.Address, count: 8},
218 {addr: backend5.Address, count: 8},
219 },
220 },
221 }
222 for _, test := range tests {
223 t.Run(test.name, func(t *testing.T) {
224 managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
225 defer cleanup()
226 routeConfigName := "route-" + serviceName
227 clusterName := "cluster-" + serviceName
228 endpointsName := "endpoints-" + serviceName
229 resources := e2e.UpdateOptions{
230 NodeID: nodeID,
231 Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)},
232 Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)},
233 Clusters: []*v3clusterpb.Cluster{clusterWithLBConfiguration(t, clusterName, endpointsName, e2e.SecurityLevelNone, test.wrrLocalityConfiguration)},
234 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
235 ClusterName: endpointsName,
236 Host: "localhost",
237 Localities: []e2e.LocalityOptions{
238 {
239 Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}},
240 Weight: 1,
241 },
242 {
243 Backends: []e2e.BackendOptions{{Port: port3}, {Port: port4}, {Port: port5}},
244 Weight: 2,
245 },
246 },
247 })},
248 }
249
250 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
251 defer cancel()
252 if err := managementServer.Update(ctx, resources); err != nil {
253 t.Fatal(err)
254 }
255
256 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
257 if err != nil {
258 t.Fatalf("Failed to dial local test server: %v", err)
259 }
260 defer cc.Close()
261
262 client := testgrpc.NewTestServiceClient(cc)
263 var addrDistWant []resolver.Address
264 for _, addrAndCount := range test.addressDistributionWant {
265 for i := 0; i < addrAndCount.count; i++ {
266 addrDistWant = append(addrDistWant, resolver.Address{Addr: addrAndCount.addr})
267 }
268 }
269 if err := roundrobin.CheckWeightedRoundRobinRPCs(ctx, client, addrDistWant); err != nil {
270 t.Fatalf("Error in expected round robin: %v", err)
271 }
272 })
273 }
274 }
275
View as plain text