...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient/xdsresource

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   */
    17  
    18  package xdsresource
    19  
    20  import (
    21  	"fmt"
    22  	"math"
    23  	"net"
    24  	"strconv"
    25  
    26  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    27  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    28  	v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
    29  	"google.golang.org/grpc/internal/pretty"
    30  	"google.golang.org/grpc/xds/internal"
    31  	"google.golang.org/protobuf/proto"
    32  	"google.golang.org/protobuf/types/known/anypb"
    33  )
    34  
    35  func unmarshalEndpointsResource(r *anypb.Any) (string, EndpointsUpdate, error) {
    36  	r, err := UnwrapResource(r)
    37  	if err != nil {
    38  		return "", EndpointsUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
    39  	}
    40  
    41  	if !IsEndpointsResource(r.GetTypeUrl()) {
    42  		return "", EndpointsUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
    43  	}
    44  
    45  	cla := &v3endpointpb.ClusterLoadAssignment{}
    46  	if err := proto.Unmarshal(r.GetValue(), cla); err != nil {
    47  		return "", EndpointsUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
    48  	}
    49  
    50  	u, err := parseEDSRespProto(cla)
    51  	if err != nil {
    52  		return cla.GetClusterName(), EndpointsUpdate{}, err
    53  	}
    54  	u.Raw = r
    55  	return cla.GetClusterName(), u, nil
    56  }
    57  
    58  func parseAddress(socketAddress *v3corepb.SocketAddress) string {
    59  	return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
    60  }
    61  
    62  func parseDropPolicy(dropPolicy *v3endpointpb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
    63  	percentage := dropPolicy.GetDropPercentage()
    64  	var (
    65  		numerator   = percentage.GetNumerator()
    66  		denominator uint32
    67  	)
    68  	switch percentage.GetDenominator() {
    69  	case v3typepb.FractionalPercent_HUNDRED:
    70  		denominator = 100
    71  	case v3typepb.FractionalPercent_TEN_THOUSAND:
    72  		denominator = 10000
    73  	case v3typepb.FractionalPercent_MILLION:
    74  		denominator = 1000000
    75  	}
    76  	return OverloadDropConfig{
    77  		Category:    dropPolicy.GetCategory(),
    78  		Numerator:   numerator,
    79  		Denominator: denominator,
    80  	}
    81  }
    82  
    83  func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs map[string]bool) ([]Endpoint, error) {
    84  	endpoints := make([]Endpoint, 0, len(lbEndpoints))
    85  	for _, lbEndpoint := range lbEndpoints {
    86  		// If the load_balancing_weight field is specified, it must be set to a
    87  		// value of at least 1.  If unspecified, each host is presumed to have
    88  		// equal weight in a locality.
    89  		weight := uint32(1)
    90  		if w := lbEndpoint.GetLoadBalancingWeight(); w != nil {
    91  			if w.GetValue() == 0 {
    92  				return nil, fmt.Errorf("EDS response contains an endpoint with zero weight: %+v", lbEndpoint)
    93  			}
    94  			weight = w.GetValue()
    95  		}
    96  		addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress())
    97  		if uniqueEndpointAddrs[addr] {
    98  			return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr)
    99  		}
   100  		uniqueEndpointAddrs[addr] = true
   101  		endpoints = append(endpoints, Endpoint{
   102  			HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
   103  			Address:      addr,
   104  			Weight:       weight,
   105  		})
   106  	}
   107  	return endpoints, nil
   108  }
   109  
   110  func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) {
   111  	ret := EndpointsUpdate{}
   112  	for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
   113  		ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
   114  	}
   115  	priorities := make(map[uint32]map[string]bool)
   116  	sumOfWeights := make(map[uint32]uint64)
   117  	uniqueEndpointAddrs := make(map[string]bool)
   118  	for _, locality := range m.Endpoints {
   119  		l := locality.GetLocality()
   120  		if l == nil {
   121  			return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
   122  		}
   123  		weight := locality.GetLoadBalancingWeight().GetValue()
   124  		if weight == 0 {
   125  			logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l))
   126  			continue
   127  		}
   128  		priority := locality.GetPriority()
   129  		sumOfWeights[priority] += uint64(weight)
   130  		if sumOfWeights[priority] > math.MaxUint32 {
   131  			return EndpointsUpdate{}, fmt.Errorf("sum of weights of localities at the same priority %d exceeded maximal value", priority)
   132  		}
   133  		localitiesWithPriority := priorities[priority]
   134  		if localitiesWithPriority == nil {
   135  			localitiesWithPriority = make(map[string]bool)
   136  			priorities[priority] = localitiesWithPriority
   137  		}
   138  		lid := internal.LocalityID{
   139  			Region:  l.Region,
   140  			Zone:    l.Zone,
   141  			SubZone: l.SubZone,
   142  		}
   143  		lidStr, _ := lid.ToString()
   144  
   145  		// "Since an xDS configuration can place a given locality under multiple
   146  		// priorities, it is possible to see locality weight attributes with
   147  		// different values for the same locality." - A52
   148  		//
   149  		// This is handled in the client by emitting the locality weight
   150  		// specified for the priority it is specified in. If the same locality
   151  		// has a different weight in two priorities, each priority will specify
   152  		// a locality with the locality weight specified for that priority, and
   153  		// thus the subsequent tree of balancers linked to that priority will
   154  		// use that locality weight as well.
   155  		if localitiesWithPriority[lidStr] {
   156  			return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority)
   157  		}
   158  		localitiesWithPriority[lidStr] = true
   159  		endpoints, err := parseEndpoints(locality.GetLbEndpoints(), uniqueEndpointAddrs)
   160  		if err != nil {
   161  			return EndpointsUpdate{}, err
   162  		}
   163  		ret.Localities = append(ret.Localities, Locality{
   164  			ID:        lid,
   165  			Endpoints: endpoints,
   166  			Weight:    weight,
   167  			Priority:  priority,
   168  		})
   169  	}
   170  	for i := 0; i < len(priorities); i++ {
   171  		if _, ok := priorities[uint32(i)]; !ok {
   172  			return EndpointsUpdate{}, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
   173  		}
   174  	}
   175  	return ret, nil
   176  }
   177  

View as plain text