...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient/xdsresource

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   */
    17  
    18  package xdsresource
    19  
    20  import (
    21  	"fmt"
    22  	"net"
    23  	"strconv"
    24  	"testing"
    25  
    26  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    27  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    28  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    29  	v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
    30  	"github.com/google/go-cmp/cmp"
    31  	"google.golang.org/grpc/internal/pretty"
    32  	"google.golang.org/grpc/internal/testutils"
    33  	"google.golang.org/grpc/xds/internal"
    34  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
    35  	"google.golang.org/protobuf/types/known/anypb"
    36  	"google.golang.org/protobuf/types/known/wrapperspb"
    37  )
    38  
    39  func (s) TestEDSParseRespProto(t *testing.T) {
    40  	tests := []struct {
    41  		name    string
    42  		m       *v3endpointpb.ClusterLoadAssignment
    43  		want    EndpointsUpdate
    44  		wantErr bool
    45  	}{
    46  		{
    47  			name: "missing-priority",
    48  			m: func() *v3endpointpb.ClusterLoadAssignment {
    49  				clab0 := newClaBuilder("test", nil)
    50  				clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
    51  				clab0.addLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
    52  				return clab0.Build()
    53  			}(),
    54  			want:    EndpointsUpdate{},
    55  			wantErr: true,
    56  		},
    57  		{
    58  			name: "missing-locality-ID",
    59  			m: func() *v3endpointpb.ClusterLoadAssignment {
    60  				clab0 := newClaBuilder("test", nil)
    61  				clab0.addLocality("", 1, 0, []string{"addr1:314"}, nil)
    62  				return clab0.Build()
    63  			}(),
    64  			want:    EndpointsUpdate{},
    65  			wantErr: true,
    66  		},
    67  		{
    68  			name: "zero-endpoint-weight",
    69  			m: func() *v3endpointpb.ClusterLoadAssignment {
    70  				clab0 := newClaBuilder("test", nil)
    71  				clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, &addLocalityOptions{Weight: []uint32{0}})
    72  				return clab0.Build()
    73  			}(),
    74  			want:    EndpointsUpdate{},
    75  			wantErr: true,
    76  		},
    77  		{
    78  			name: "duplicate-locality-in-the-same-priority",
    79  			m: func() *v3endpointpb.ClusterLoadAssignment {
    80  				clab0 := newClaBuilder("test", nil)
    81  				clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, nil)
    82  				clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, nil) // Duplicate locality with the same priority.
    83  				return clab0.Build()
    84  			}(),
    85  			want:    EndpointsUpdate{},
    86  			wantErr: true,
    87  		},
    88  		{
    89  			name: "missing locality weight",
    90  			m: func() *v3endpointpb.ClusterLoadAssignment {
    91  				clab0 := newClaBuilder("test", nil)
    92  				clab0.addLocality("locality-1", 0, 1, []string{"addr1:314"}, &addLocalityOptions{
    93  					Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
    94  				})
    95  				clab0.addLocality("locality-2", 0, 0, []string{"addr2:159"}, &addLocalityOptions{
    96  					Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
    97  				})
    98  				return clab0.Build()
    99  			}(),
   100  			want: EndpointsUpdate{},
   101  		},
   102  		{
   103  			name: "max sum of weights at the same priority exceeded",
   104  			m: func() *v3endpointpb.ClusterLoadAssignment {
   105  				clab0 := newClaBuilder("test", nil)
   106  				clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
   107  				clab0.addLocality("locality-2", 4294967295, 1, []string{"addr2:159"}, nil)
   108  				clab0.addLocality("locality-3", 1, 1, []string{"addr2:88"}, nil)
   109  				return clab0.Build()
   110  			}(),
   111  			want:    EndpointsUpdate{},
   112  			wantErr: true,
   113  		},
   114  		{
   115  			name: "duplicate endpoint address",
   116  			m: func() *v3endpointpb.ClusterLoadAssignment {
   117  				clab0 := newClaBuilder("test", nil)
   118  				clab0.addLocality("locality-1", 1, 1, []string{"addr:997"}, nil)
   119  				clab0.addLocality("locality-2", 1, 0, []string{"addr:997"}, nil)
   120  				return clab0.Build()
   121  			}(),
   122  			want:    EndpointsUpdate{},
   123  			wantErr: true,
   124  		},
   125  		{
   126  			name: "good",
   127  			m: func() *v3endpointpb.ClusterLoadAssignment {
   128  				clab0 := newClaBuilder("test", nil)
   129  				clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{
   130  					Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY},
   131  					Weight: []uint32{271},
   132  				})
   133  				clab0.addLocality("locality-2", 1, 0, []string{"addr2:159"}, &addLocalityOptions{
   134  					Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING},
   135  					Weight: []uint32{828},
   136  				})
   137  				return clab0.Build()
   138  			}(),
   139  			want: EndpointsUpdate{
   140  				Drops: nil,
   141  				Localities: []Locality{
   142  					{
   143  						Endpoints: []Endpoint{{
   144  							Address:      "addr1:314",
   145  							HealthStatus: EndpointHealthStatusUnhealthy,
   146  							Weight:       271,
   147  						}},
   148  						ID:       internal.LocalityID{SubZone: "locality-1"},
   149  						Priority: 1,
   150  						Weight:   1,
   151  					},
   152  					{
   153  						Endpoints: []Endpoint{{
   154  							Address:      "addr2:159",
   155  							HealthStatus: EndpointHealthStatusDraining,
   156  							Weight:       828,
   157  						}},
   158  						ID:       internal.LocalityID{SubZone: "locality-2"},
   159  						Priority: 0,
   160  						Weight:   1,
   161  					},
   162  				},
   163  			},
   164  			wantErr: false,
   165  		},
   166  		{
   167  			name: "good duplicate locality with different priority",
   168  			m: func() *v3endpointpb.ClusterLoadAssignment {
   169  				clab0 := newClaBuilder("test", nil)
   170  				clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{
   171  					Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY},
   172  					Weight: []uint32{271},
   173  				})
   174  				// Same locality name, but with different priority.
   175  				clab0.addLocality("locality-1", 1, 0, []string{"addr2:159"}, &addLocalityOptions{
   176  					Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING},
   177  					Weight: []uint32{828},
   178  				})
   179  				return clab0.Build()
   180  			}(),
   181  			want: EndpointsUpdate{
   182  				Drops: nil,
   183  				Localities: []Locality{
   184  					{
   185  						Endpoints: []Endpoint{{
   186  							Address:      "addr1:314",
   187  							HealthStatus: EndpointHealthStatusUnhealthy,
   188  							Weight:       271,
   189  						}},
   190  						ID:       internal.LocalityID{SubZone: "locality-1"},
   191  						Priority: 1,
   192  						Weight:   1,
   193  					},
   194  					{
   195  						Endpoints: []Endpoint{{
   196  							Address:      "addr2:159",
   197  							HealthStatus: EndpointHealthStatusDraining,
   198  							Weight:       828,
   199  						}},
   200  						ID:       internal.LocalityID{SubZone: "locality-1"},
   201  						Priority: 0,
   202  						Weight:   1,
   203  					},
   204  				},
   205  			},
   206  			wantErr: false,
   207  		},
   208  	}
   209  	for _, tt := range tests {
   210  		t.Run(tt.name, func(t *testing.T) {
   211  			got, err := parseEDSRespProto(tt.m)
   212  			if (err != nil) != tt.wantErr {
   213  				t.Errorf("parseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr)
   214  				return
   215  			}
   216  			if d := cmp.Diff(got, tt.want); d != "" {
   217  				t.Errorf("parseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d)
   218  			}
   219  		})
   220  	}
   221  }
   222  
   223  func (s) TestUnmarshalEndpoints(t *testing.T) {
   224  	var v3EndpointsAny = testutils.MarshalAny(t, func() *v3endpointpb.ClusterLoadAssignment {
   225  		clab0 := newClaBuilder("test", nil)
   226  		clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{
   227  			Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY},
   228  			Weight: []uint32{271},
   229  		})
   230  		clab0.addLocality("locality-2", 1, 0, []string{"addr2:159"}, &addLocalityOptions{
   231  			Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING},
   232  			Weight: []uint32{828},
   233  		})
   234  		return clab0.Build()
   235  	}())
   236  
   237  	tests := []struct {
   238  		name       string
   239  		resource   *anypb.Any
   240  		wantName   string
   241  		wantUpdate EndpointsUpdate
   242  		wantErr    bool
   243  	}{
   244  		{
   245  			name:     "non-clusterLoadAssignment resource type",
   246  			resource: &anypb.Any{TypeUrl: version.V3HTTPConnManagerURL},
   247  			wantErr:  true,
   248  		},
   249  		{
   250  			name: "badly marshaled clusterLoadAssignment resource",
   251  			resource: &anypb.Any{
   252  				TypeUrl: version.V3EndpointsURL,
   253  				Value:   []byte{1, 2, 3, 4},
   254  			},
   255  			wantErr: true,
   256  		},
   257  		{
   258  			name: "bad endpoints resource",
   259  			resource: testutils.MarshalAny(t, func() *v3endpointpb.ClusterLoadAssignment {
   260  				clab0 := newClaBuilder("test", nil)
   261  				clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
   262  				clab0.addLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
   263  				return clab0.Build()
   264  			}()),
   265  			wantName: "test",
   266  			wantErr:  true,
   267  		},
   268  		{
   269  			name:     "v3 endpoints",
   270  			resource: v3EndpointsAny,
   271  			wantName: "test",
   272  			wantUpdate: EndpointsUpdate{
   273  				Drops: nil,
   274  				Localities: []Locality{
   275  					{
   276  						Endpoints: []Endpoint{{
   277  							Address:      "addr1:314",
   278  							HealthStatus: EndpointHealthStatusUnhealthy,
   279  							Weight:       271,
   280  						}},
   281  						ID:       internal.LocalityID{SubZone: "locality-1"},
   282  						Priority: 1,
   283  						Weight:   1,
   284  					},
   285  					{
   286  						Endpoints: []Endpoint{{
   287  							Address:      "addr2:159",
   288  							HealthStatus: EndpointHealthStatusDraining,
   289  							Weight:       828,
   290  						}},
   291  						ID:       internal.LocalityID{SubZone: "locality-2"},
   292  						Priority: 0,
   293  						Weight:   1,
   294  					},
   295  				},
   296  				Raw: v3EndpointsAny,
   297  			},
   298  		},
   299  		{
   300  			name:     "v3 endpoints wrapped",
   301  			resource: testutils.MarshalAny(t, &v3discoverypb.Resource{Resource: v3EndpointsAny}),
   302  			wantName: "test",
   303  			wantUpdate: EndpointsUpdate{
   304  				Drops: nil,
   305  				Localities: []Locality{
   306  					{
   307  						Endpoints: []Endpoint{{
   308  							Address:      "addr1:314",
   309  							HealthStatus: EndpointHealthStatusUnhealthy,
   310  							Weight:       271,
   311  						}},
   312  						ID:       internal.LocalityID{SubZone: "locality-1"},
   313  						Priority: 1,
   314  						Weight:   1,
   315  					},
   316  					{
   317  						Endpoints: []Endpoint{{
   318  							Address:      "addr2:159",
   319  							HealthStatus: EndpointHealthStatusDraining,
   320  							Weight:       828,
   321  						}},
   322  						ID:       internal.LocalityID{SubZone: "locality-2"},
   323  						Priority: 0,
   324  						Weight:   1,
   325  					},
   326  				},
   327  				Raw: v3EndpointsAny,
   328  			},
   329  		},
   330  	}
   331  	for _, test := range tests {
   332  		t.Run(test.name, func(t *testing.T) {
   333  			name, update, err := unmarshalEndpointsResource(test.resource)
   334  			if (err != nil) != test.wantErr {
   335  				t.Fatalf("unmarshalEndpointsResource(%s), got err: %v, wantErr: %v", pretty.ToJSON(test.resource), err, test.wantErr)
   336  			}
   337  			if name != test.wantName {
   338  				t.Errorf("unmarshalEndpointsResource(%s), got name: %s, want: %s", pretty.ToJSON(test.resource), name, test.wantName)
   339  			}
   340  			if diff := cmp.Diff(update, test.wantUpdate, cmpOpts); diff != "" {
   341  				t.Errorf("unmarshalEndpointsResource(%s), got unexpected update, diff (-got +want): %v", pretty.ToJSON(test.resource), diff)
   342  			}
   343  		})
   344  	}
   345  }
   346  
   347  // claBuilder builds a ClusterLoadAssignment, aka EDS
   348  // response.
   349  type claBuilder struct {
   350  	v *v3endpointpb.ClusterLoadAssignment
   351  }
   352  
   353  // newClaBuilder creates a claBuilder.
   354  func newClaBuilder(clusterName string, dropPercents []uint32) *claBuilder {
   355  	var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
   356  	for i, d := range dropPercents {
   357  		drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
   358  			Category: fmt.Sprintf("test-drop-%d", i),
   359  			DropPercentage: &v3typepb.FractionalPercent{
   360  				Numerator:   d,
   361  				Denominator: v3typepb.FractionalPercent_HUNDRED,
   362  			},
   363  		})
   364  	}
   365  
   366  	return &claBuilder{
   367  		v: &v3endpointpb.ClusterLoadAssignment{
   368  			ClusterName: clusterName,
   369  			Policy: &v3endpointpb.ClusterLoadAssignment_Policy{
   370  				DropOverloads: drops,
   371  			},
   372  		},
   373  	}
   374  }
   375  
   376  // addLocalityOptions contains options when adding locality to the builder.
   377  type addLocalityOptions struct {
   378  	Health []v3corepb.HealthStatus
   379  	Weight []uint32
   380  }
   381  
   382  // addLocality adds a locality to the builder.
   383  func (clab *claBuilder) addLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *addLocalityOptions) {
   384  	var lbEndPoints []*v3endpointpb.LbEndpoint
   385  	for i, a := range addrsWithPort {
   386  		host, portStr, err := net.SplitHostPort(a)
   387  		if err != nil {
   388  			panic("failed to split " + a)
   389  		}
   390  		port, err := strconv.Atoi(portStr)
   391  		if err != nil {
   392  			panic("failed to atoi " + portStr)
   393  		}
   394  
   395  		lbe := &v3endpointpb.LbEndpoint{
   396  			HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{
   397  				Endpoint: &v3endpointpb.Endpoint{
   398  					Address: &v3corepb.Address{
   399  						Address: &v3corepb.Address_SocketAddress{
   400  							SocketAddress: &v3corepb.SocketAddress{
   401  								Protocol: v3corepb.SocketAddress_TCP,
   402  								Address:  host,
   403  								PortSpecifier: &v3corepb.SocketAddress_PortValue{
   404  									PortValue: uint32(port)}}}}}},
   405  		}
   406  		if opts != nil {
   407  			if i < len(opts.Health) {
   408  				lbe.HealthStatus = opts.Health[i]
   409  			}
   410  			if i < len(opts.Weight) {
   411  				lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]}
   412  			}
   413  		}
   414  		lbEndPoints = append(lbEndPoints, lbe)
   415  	}
   416  
   417  	var localityID *v3corepb.Locality
   418  	if subzone != "" {
   419  		localityID = &v3corepb.Locality{
   420  			Region:  "",
   421  			Zone:    "",
   422  			SubZone: subzone,
   423  		}
   424  	}
   425  
   426  	clab.v.Endpoints = append(clab.v.Endpoints, &v3endpointpb.LocalityLbEndpoints{
   427  		Locality:            localityID,
   428  		LbEndpoints:         lbEndPoints,
   429  		LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight},
   430  		Priority:            priority,
   431  	})
   432  }
   433  
   434  // Build builds ClusterLoadAssignment.
   435  func (clab *claBuilder) Build() *v3endpointpb.ClusterLoadAssignment {
   436  	return clab.v
   437  }
   438  

View as plain text