1
17
18 package xdsresource
19
20 import (
21 "fmt"
22 "math"
23 "net"
24 "strconv"
25
26 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
27 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
28 v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
29 "google.golang.org/grpc/internal/pretty"
30 "google.golang.org/grpc/xds/internal"
31 "google.golang.org/protobuf/proto"
32 "google.golang.org/protobuf/types/known/anypb"
33 )
34
35 func unmarshalEndpointsResource(r *anypb.Any) (string, EndpointsUpdate, error) {
36 r, err := UnwrapResource(r)
37 if err != nil {
38 return "", EndpointsUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
39 }
40
41 if !IsEndpointsResource(r.GetTypeUrl()) {
42 return "", EndpointsUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
43 }
44
45 cla := &v3endpointpb.ClusterLoadAssignment{}
46 if err := proto.Unmarshal(r.GetValue(), cla); err != nil {
47 return "", EndpointsUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
48 }
49
50 u, err := parseEDSRespProto(cla)
51 if err != nil {
52 return cla.GetClusterName(), EndpointsUpdate{}, err
53 }
54 u.Raw = r
55 return cla.GetClusterName(), u, nil
56 }
57
58 func parseAddress(socketAddress *v3corepb.SocketAddress) string {
59 return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
60 }
61
62 func parseDropPolicy(dropPolicy *v3endpointpb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
63 percentage := dropPolicy.GetDropPercentage()
64 var (
65 numerator = percentage.GetNumerator()
66 denominator uint32
67 )
68 switch percentage.GetDenominator() {
69 case v3typepb.FractionalPercent_HUNDRED:
70 denominator = 100
71 case v3typepb.FractionalPercent_TEN_THOUSAND:
72 denominator = 10000
73 case v3typepb.FractionalPercent_MILLION:
74 denominator = 1000000
75 }
76 return OverloadDropConfig{
77 Category: dropPolicy.GetCategory(),
78 Numerator: numerator,
79 Denominator: denominator,
80 }
81 }
82
83 func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs map[string]bool) ([]Endpoint, error) {
84 endpoints := make([]Endpoint, 0, len(lbEndpoints))
85 for _, lbEndpoint := range lbEndpoints {
86
87
88
89 weight := uint32(1)
90 if w := lbEndpoint.GetLoadBalancingWeight(); w != nil {
91 if w.GetValue() == 0 {
92 return nil, fmt.Errorf("EDS response contains an endpoint with zero weight: %+v", lbEndpoint)
93 }
94 weight = w.GetValue()
95 }
96 addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress())
97 if uniqueEndpointAddrs[addr] {
98 return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr)
99 }
100 uniqueEndpointAddrs[addr] = true
101 endpoints = append(endpoints, Endpoint{
102 HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
103 Address: addr,
104 Weight: weight,
105 })
106 }
107 return endpoints, nil
108 }
109
110 func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) {
111 ret := EndpointsUpdate{}
112 for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
113 ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
114 }
115 priorities := make(map[uint32]map[string]bool)
116 sumOfWeights := make(map[uint32]uint64)
117 uniqueEndpointAddrs := make(map[string]bool)
118 for _, locality := range m.Endpoints {
119 l := locality.GetLocality()
120 if l == nil {
121 return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
122 }
123 weight := locality.GetLoadBalancingWeight().GetValue()
124 if weight == 0 {
125 logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l))
126 continue
127 }
128 priority := locality.GetPriority()
129 sumOfWeights[priority] += uint64(weight)
130 if sumOfWeights[priority] > math.MaxUint32 {
131 return EndpointsUpdate{}, fmt.Errorf("sum of weights of localities at the same priority %d exceeded maximal value", priority)
132 }
133 localitiesWithPriority := priorities[priority]
134 if localitiesWithPriority == nil {
135 localitiesWithPriority = make(map[string]bool)
136 priorities[priority] = localitiesWithPriority
137 }
138 lid := internal.LocalityID{
139 Region: l.Region,
140 Zone: l.Zone,
141 SubZone: l.SubZone,
142 }
143 lidStr, _ := lid.ToString()
144
145
146
147
148
149
150
151
152
153
154
155 if localitiesWithPriority[lidStr] {
156 return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority)
157 }
158 localitiesWithPriority[lidStr] = true
159 endpoints, err := parseEndpoints(locality.GetLbEndpoints(), uniqueEndpointAddrs)
160 if err != nil {
161 return EndpointsUpdate{}, err
162 }
163 ret.Localities = append(ret.Localities, Locality{
164 ID: lid,
165 Endpoints: endpoints,
166 Weight: weight,
167 Priority: priority,
168 })
169 }
170 for i := 0; i < len(priorities); i++ {
171 if _, ok := priorities[uint32(i)]; !ok {
172 return EndpointsUpdate{}, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
173 }
174 }
175 return ret, nil
176 }
177
View as plain text