1
17
18 package xdsresource
19
20 import (
21 "fmt"
22 "net"
23 "strconv"
24 "testing"
25
26 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
27 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
28 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
29 v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
30 "github.com/google/go-cmp/cmp"
31 "google.golang.org/grpc/internal/pretty"
32 "google.golang.org/grpc/internal/testutils"
33 "google.golang.org/grpc/xds/internal"
34 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
35 "google.golang.org/protobuf/types/known/anypb"
36 "google.golang.org/protobuf/types/known/wrapperspb"
37 )
38
39 func (s) TestEDSParseRespProto(t *testing.T) {
40 tests := []struct {
41 name string
42 m *v3endpointpb.ClusterLoadAssignment
43 want EndpointsUpdate
44 wantErr bool
45 }{
46 {
47 name: "missing-priority",
48 m: func() *v3endpointpb.ClusterLoadAssignment {
49 clab0 := newClaBuilder("test", nil)
50 clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
51 clab0.addLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
52 return clab0.Build()
53 }(),
54 want: EndpointsUpdate{},
55 wantErr: true,
56 },
57 {
58 name: "missing-locality-ID",
59 m: func() *v3endpointpb.ClusterLoadAssignment {
60 clab0 := newClaBuilder("test", nil)
61 clab0.addLocality("", 1, 0, []string{"addr1:314"}, nil)
62 return clab0.Build()
63 }(),
64 want: EndpointsUpdate{},
65 wantErr: true,
66 },
67 {
68 name: "zero-endpoint-weight",
69 m: func() *v3endpointpb.ClusterLoadAssignment {
70 clab0 := newClaBuilder("test", nil)
71 clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, &addLocalityOptions{Weight: []uint32{0}})
72 return clab0.Build()
73 }(),
74 want: EndpointsUpdate{},
75 wantErr: true,
76 },
77 {
78 name: "duplicate-locality-in-the-same-priority",
79 m: func() *v3endpointpb.ClusterLoadAssignment {
80 clab0 := newClaBuilder("test", nil)
81 clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, nil)
82 clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, nil)
83 return clab0.Build()
84 }(),
85 want: EndpointsUpdate{},
86 wantErr: true,
87 },
88 {
89 name: "missing locality weight",
90 m: func() *v3endpointpb.ClusterLoadAssignment {
91 clab0 := newClaBuilder("test", nil)
92 clab0.addLocality("locality-1", 0, 1, []string{"addr1:314"}, &addLocalityOptions{
93 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
94 })
95 clab0.addLocality("locality-2", 0, 0, []string{"addr2:159"}, &addLocalityOptions{
96 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
97 })
98 return clab0.Build()
99 }(),
100 want: EndpointsUpdate{},
101 },
102 {
103 name: "max sum of weights at the same priority exceeded",
104 m: func() *v3endpointpb.ClusterLoadAssignment {
105 clab0 := newClaBuilder("test", nil)
106 clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
107 clab0.addLocality("locality-2", 4294967295, 1, []string{"addr2:159"}, nil)
108 clab0.addLocality("locality-3", 1, 1, []string{"addr2:88"}, nil)
109 return clab0.Build()
110 }(),
111 want: EndpointsUpdate{},
112 wantErr: true,
113 },
114 {
115 name: "duplicate endpoint address",
116 m: func() *v3endpointpb.ClusterLoadAssignment {
117 clab0 := newClaBuilder("test", nil)
118 clab0.addLocality("locality-1", 1, 1, []string{"addr:997"}, nil)
119 clab0.addLocality("locality-2", 1, 0, []string{"addr:997"}, nil)
120 return clab0.Build()
121 }(),
122 want: EndpointsUpdate{},
123 wantErr: true,
124 },
125 {
126 name: "good",
127 m: func() *v3endpointpb.ClusterLoadAssignment {
128 clab0 := newClaBuilder("test", nil)
129 clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{
130 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY},
131 Weight: []uint32{271},
132 })
133 clab0.addLocality("locality-2", 1, 0, []string{"addr2:159"}, &addLocalityOptions{
134 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING},
135 Weight: []uint32{828},
136 })
137 return clab0.Build()
138 }(),
139 want: EndpointsUpdate{
140 Drops: nil,
141 Localities: []Locality{
142 {
143 Endpoints: []Endpoint{{
144 Address: "addr1:314",
145 HealthStatus: EndpointHealthStatusUnhealthy,
146 Weight: 271,
147 }},
148 ID: internal.LocalityID{SubZone: "locality-1"},
149 Priority: 1,
150 Weight: 1,
151 },
152 {
153 Endpoints: []Endpoint{{
154 Address: "addr2:159",
155 HealthStatus: EndpointHealthStatusDraining,
156 Weight: 828,
157 }},
158 ID: internal.LocalityID{SubZone: "locality-2"},
159 Priority: 0,
160 Weight: 1,
161 },
162 },
163 },
164 wantErr: false,
165 },
166 {
167 name: "good duplicate locality with different priority",
168 m: func() *v3endpointpb.ClusterLoadAssignment {
169 clab0 := newClaBuilder("test", nil)
170 clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{
171 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY},
172 Weight: []uint32{271},
173 })
174
175 clab0.addLocality("locality-1", 1, 0, []string{"addr2:159"}, &addLocalityOptions{
176 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING},
177 Weight: []uint32{828},
178 })
179 return clab0.Build()
180 }(),
181 want: EndpointsUpdate{
182 Drops: nil,
183 Localities: []Locality{
184 {
185 Endpoints: []Endpoint{{
186 Address: "addr1:314",
187 HealthStatus: EndpointHealthStatusUnhealthy,
188 Weight: 271,
189 }},
190 ID: internal.LocalityID{SubZone: "locality-1"},
191 Priority: 1,
192 Weight: 1,
193 },
194 {
195 Endpoints: []Endpoint{{
196 Address: "addr2:159",
197 HealthStatus: EndpointHealthStatusDraining,
198 Weight: 828,
199 }},
200 ID: internal.LocalityID{SubZone: "locality-1"},
201 Priority: 0,
202 Weight: 1,
203 },
204 },
205 },
206 wantErr: false,
207 },
208 }
209 for _, tt := range tests {
210 t.Run(tt.name, func(t *testing.T) {
211 got, err := parseEDSRespProto(tt.m)
212 if (err != nil) != tt.wantErr {
213 t.Errorf("parseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr)
214 return
215 }
216 if d := cmp.Diff(got, tt.want); d != "" {
217 t.Errorf("parseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d)
218 }
219 })
220 }
221 }
222
223 func (s) TestUnmarshalEndpoints(t *testing.T) {
224 var v3EndpointsAny = testutils.MarshalAny(t, func() *v3endpointpb.ClusterLoadAssignment {
225 clab0 := newClaBuilder("test", nil)
226 clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{
227 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY},
228 Weight: []uint32{271},
229 })
230 clab0.addLocality("locality-2", 1, 0, []string{"addr2:159"}, &addLocalityOptions{
231 Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING},
232 Weight: []uint32{828},
233 })
234 return clab0.Build()
235 }())
236
237 tests := []struct {
238 name string
239 resource *anypb.Any
240 wantName string
241 wantUpdate EndpointsUpdate
242 wantErr bool
243 }{
244 {
245 name: "non-clusterLoadAssignment resource type",
246 resource: &anypb.Any{TypeUrl: version.V3HTTPConnManagerURL},
247 wantErr: true,
248 },
249 {
250 name: "badly marshaled clusterLoadAssignment resource",
251 resource: &anypb.Any{
252 TypeUrl: version.V3EndpointsURL,
253 Value: []byte{1, 2, 3, 4},
254 },
255 wantErr: true,
256 },
257 {
258 name: "bad endpoints resource",
259 resource: testutils.MarshalAny(t, func() *v3endpointpb.ClusterLoadAssignment {
260 clab0 := newClaBuilder("test", nil)
261 clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
262 clab0.addLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
263 return clab0.Build()
264 }()),
265 wantName: "test",
266 wantErr: true,
267 },
268 {
269 name: "v3 endpoints",
270 resource: v3EndpointsAny,
271 wantName: "test",
272 wantUpdate: EndpointsUpdate{
273 Drops: nil,
274 Localities: []Locality{
275 {
276 Endpoints: []Endpoint{{
277 Address: "addr1:314",
278 HealthStatus: EndpointHealthStatusUnhealthy,
279 Weight: 271,
280 }},
281 ID: internal.LocalityID{SubZone: "locality-1"},
282 Priority: 1,
283 Weight: 1,
284 },
285 {
286 Endpoints: []Endpoint{{
287 Address: "addr2:159",
288 HealthStatus: EndpointHealthStatusDraining,
289 Weight: 828,
290 }},
291 ID: internal.LocalityID{SubZone: "locality-2"},
292 Priority: 0,
293 Weight: 1,
294 },
295 },
296 Raw: v3EndpointsAny,
297 },
298 },
299 {
300 name: "v3 endpoints wrapped",
301 resource: testutils.MarshalAny(t, &v3discoverypb.Resource{Resource: v3EndpointsAny}),
302 wantName: "test",
303 wantUpdate: EndpointsUpdate{
304 Drops: nil,
305 Localities: []Locality{
306 {
307 Endpoints: []Endpoint{{
308 Address: "addr1:314",
309 HealthStatus: EndpointHealthStatusUnhealthy,
310 Weight: 271,
311 }},
312 ID: internal.LocalityID{SubZone: "locality-1"},
313 Priority: 1,
314 Weight: 1,
315 },
316 {
317 Endpoints: []Endpoint{{
318 Address: "addr2:159",
319 HealthStatus: EndpointHealthStatusDraining,
320 Weight: 828,
321 }},
322 ID: internal.LocalityID{SubZone: "locality-2"},
323 Priority: 0,
324 Weight: 1,
325 },
326 },
327 Raw: v3EndpointsAny,
328 },
329 },
330 }
331 for _, test := range tests {
332 t.Run(test.name, func(t *testing.T) {
333 name, update, err := unmarshalEndpointsResource(test.resource)
334 if (err != nil) != test.wantErr {
335 t.Fatalf("unmarshalEndpointsResource(%s), got err: %v, wantErr: %v", pretty.ToJSON(test.resource), err, test.wantErr)
336 }
337 if name != test.wantName {
338 t.Errorf("unmarshalEndpointsResource(%s), got name: %s, want: %s", pretty.ToJSON(test.resource), name, test.wantName)
339 }
340 if diff := cmp.Diff(update, test.wantUpdate, cmpOpts); diff != "" {
341 t.Errorf("unmarshalEndpointsResource(%s), got unexpected update, diff (-got +want): %v", pretty.ToJSON(test.resource), diff)
342 }
343 })
344 }
345 }
346
347
348
349 type claBuilder struct {
350 v *v3endpointpb.ClusterLoadAssignment
351 }
352
353
354 func newClaBuilder(clusterName string, dropPercents []uint32) *claBuilder {
355 var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
356 for i, d := range dropPercents {
357 drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
358 Category: fmt.Sprintf("test-drop-%d", i),
359 DropPercentage: &v3typepb.FractionalPercent{
360 Numerator: d,
361 Denominator: v3typepb.FractionalPercent_HUNDRED,
362 },
363 })
364 }
365
366 return &claBuilder{
367 v: &v3endpointpb.ClusterLoadAssignment{
368 ClusterName: clusterName,
369 Policy: &v3endpointpb.ClusterLoadAssignment_Policy{
370 DropOverloads: drops,
371 },
372 },
373 }
374 }
375
376
377 type addLocalityOptions struct {
378 Health []v3corepb.HealthStatus
379 Weight []uint32
380 }
381
382
383 func (clab *claBuilder) addLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *addLocalityOptions) {
384 var lbEndPoints []*v3endpointpb.LbEndpoint
385 for i, a := range addrsWithPort {
386 host, portStr, err := net.SplitHostPort(a)
387 if err != nil {
388 panic("failed to split " + a)
389 }
390 port, err := strconv.Atoi(portStr)
391 if err != nil {
392 panic("failed to atoi " + portStr)
393 }
394
395 lbe := &v3endpointpb.LbEndpoint{
396 HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{
397 Endpoint: &v3endpointpb.Endpoint{
398 Address: &v3corepb.Address{
399 Address: &v3corepb.Address_SocketAddress{
400 SocketAddress: &v3corepb.SocketAddress{
401 Protocol: v3corepb.SocketAddress_TCP,
402 Address: host,
403 PortSpecifier: &v3corepb.SocketAddress_PortValue{
404 PortValue: uint32(port)}}}}}},
405 }
406 if opts != nil {
407 if i < len(opts.Health) {
408 lbe.HealthStatus = opts.Health[i]
409 }
410 if i < len(opts.Weight) {
411 lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]}
412 }
413 }
414 lbEndPoints = append(lbEndPoints, lbe)
415 }
416
417 var localityID *v3corepb.Locality
418 if subzone != "" {
419 localityID = &v3corepb.Locality{
420 Region: "",
421 Zone: "",
422 SubZone: subzone,
423 }
424 }
425
426 clab.v.Endpoints = append(clab.v.Endpoints, &v3endpointpb.LocalityLbEndpoints{
427 Locality: localityID,
428 LbEndpoints: lbEndPoints,
429 LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight},
430 Priority: priority,
431 })
432 }
433
434
435 func (clab *claBuilder) Build() *v3endpointpb.ClusterLoadAssignment {
436 return clab.v
437 }
438
View as plain text