1
17
18 package xdsresource
19
20 import (
21 "fmt"
22 "math"
23 "regexp"
24 "strings"
25 "time"
26
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/internal/xds/matcher"
29 "google.golang.org/grpc/xds/internal/clusterspecifier"
30 "google.golang.org/protobuf/proto"
31 "google.golang.org/protobuf/types/known/anypb"
32
33 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
34 v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
35 )
36
37 func unmarshalRouteConfigResource(r *anypb.Any) (string, RouteConfigUpdate, error) {
38 r, err := UnwrapResource(r)
39 if err != nil {
40 return "", RouteConfigUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
41 }
42
43 if !IsRouteConfigResource(r.GetTypeUrl()) {
44 return "", RouteConfigUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
45 }
46 rc := &v3routepb.RouteConfiguration{}
47 if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
48 return "", RouteConfigUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
49 }
50
51 u, err := generateRDSUpdateFromRouteConfiguration(rc)
52 if err != nil {
53 return rc.GetName(), RouteConfigUpdate{}, err
54 }
55 u.Raw = r
56 return rc.GetName(), u, nil
57 }
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration) (RouteConfigUpdate, error) {
76 vhs := make([]*VirtualHost, 0, len(rc.GetVirtualHosts()))
77 csps, err := processClusterSpecifierPlugins(rc.ClusterSpecifierPlugins)
78 if err != nil {
79 return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
80 }
81
82
83
84 var cspNames = make(map[string]bool)
85 for _, vh := range rc.GetVirtualHosts() {
86 routes, cspNs, err := routesProtoToSlice(vh.Routes, csps)
87 if err != nil {
88 return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
89 }
90 for n := range cspNs {
91 cspNames[n] = true
92 }
93 rc, err := generateRetryConfig(vh.GetRetryPolicy())
94 if err != nil {
95 return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
96 }
97 vhOut := &VirtualHost{
98 Domains: vh.GetDomains(),
99 Routes: routes,
100 RetryConfig: rc,
101 }
102 cfgs, err := processHTTPFilterOverrides(vh.GetTypedPerFilterConfig())
103 if err != nil {
104 return RouteConfigUpdate{}, fmt.Errorf("virtual host %+v: %v", vh, err)
105 }
106 vhOut.HTTPFilterConfigOverride = cfgs
107 vhs = append(vhs, vhOut)
108 }
109
110
111
112
113 for name := range csps {
114 if !cspNames[name] {
115 delete(csps, name)
116 }
117 }
118
119 return RouteConfigUpdate{VirtualHosts: vhs, ClusterSpecifierPlugins: csps}, nil
120 }
121
122 func processClusterSpecifierPlugins(csps []*v3routepb.ClusterSpecifierPlugin) (map[string]clusterspecifier.BalancerConfig, error) {
123 cspCfgs := make(map[string]clusterspecifier.BalancerConfig)
124
125
126
127 for _, csp := range csps {
128 cs := clusterspecifier.Get(csp.GetExtension().GetTypedConfig().GetTypeUrl())
129 if cs == nil {
130 if csp.GetIsOptional() {
131
132
133 cspCfgs[csp.GetExtension().GetName()] = nil
134 continue
135 }
136
137
138 return nil, fmt.Errorf("cluster specifier %q of type %q was not found", csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl())
139 }
140 lbCfg, err := cs.ParseClusterSpecifierConfig(csp.GetExtension().GetTypedConfig())
141 if err != nil {
142
143
144
145 return nil, fmt.Errorf("error: %q parsing config %q for cluster specifier %q of type %q", err, csp.GetExtension().GetTypedConfig(), csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl())
146 }
147
148
149
150 cspCfgs[csp.GetExtension().GetName()] = lbCfg
151 }
152 return cspCfgs, nil
153 }
154
155 func generateRetryConfig(rp *v3routepb.RetryPolicy) (*RetryConfig, error) {
156 if rp == nil {
157 return nil, nil
158 }
159
160 cfg := &RetryConfig{RetryOn: make(map[codes.Code]bool)}
161 for _, s := range strings.Split(rp.GetRetryOn(), ",") {
162 switch strings.TrimSpace(strings.ToLower(s)) {
163 case "cancelled":
164 cfg.RetryOn[codes.Canceled] = true
165 case "deadline-exceeded":
166 cfg.RetryOn[codes.DeadlineExceeded] = true
167 case "internal":
168 cfg.RetryOn[codes.Internal] = true
169 case "resource-exhausted":
170 cfg.RetryOn[codes.ResourceExhausted] = true
171 case "unavailable":
172 cfg.RetryOn[codes.Unavailable] = true
173 }
174 }
175
176 if rp.NumRetries == nil {
177 cfg.NumRetries = 1
178 } else {
179 cfg.NumRetries = rp.GetNumRetries().Value
180 if cfg.NumRetries < 1 {
181 return nil, fmt.Errorf("retry_policy.num_retries = %v; must be >= 1", cfg.NumRetries)
182 }
183 }
184
185 backoff := rp.GetRetryBackOff()
186 if backoff == nil {
187 cfg.RetryBackoff.BaseInterval = 25 * time.Millisecond
188 } else {
189 cfg.RetryBackoff.BaseInterval = backoff.GetBaseInterval().AsDuration()
190 if cfg.RetryBackoff.BaseInterval <= 0 {
191 return nil, fmt.Errorf("retry_policy.base_interval = %v; must be > 0", cfg.RetryBackoff.BaseInterval)
192 }
193 }
194 if max := backoff.GetMaxInterval(); max == nil {
195 cfg.RetryBackoff.MaxInterval = 10 * cfg.RetryBackoff.BaseInterval
196 } else {
197 cfg.RetryBackoff.MaxInterval = max.AsDuration()
198 if cfg.RetryBackoff.MaxInterval <= 0 {
199 return nil, fmt.Errorf("retry_policy.max_interval = %v; must be > 0", cfg.RetryBackoff.MaxInterval)
200 }
201 }
202
203 if len(cfg.RetryOn) == 0 {
204 return &RetryConfig{}, nil
205 }
206 return cfg, nil
207 }
208
209 func routesProtoToSlice(routes []*v3routepb.Route, csps map[string]clusterspecifier.BalancerConfig) ([]*Route, map[string]bool, error) {
210 var routesRet []*Route
211 var cspNames = make(map[string]bool)
212 for _, r := range routes {
213 match := r.GetMatch()
214 if match == nil {
215 return nil, nil, fmt.Errorf("route %+v doesn't have a match", r)
216 }
217
218 if len(match.GetQueryParameters()) != 0 {
219
220 logger.Warningf("Ignoring route %+v with query parameter matchers", r)
221 continue
222 }
223
224 pathSp := match.GetPathSpecifier()
225 if pathSp == nil {
226 return nil, nil, fmt.Errorf("route %+v doesn't have a path specifier", r)
227 }
228
229 var route Route
230 switch pt := pathSp.(type) {
231 case *v3routepb.RouteMatch_Prefix:
232 route.Prefix = &pt.Prefix
233 case *v3routepb.RouteMatch_Path:
234 route.Path = &pt.Path
235 case *v3routepb.RouteMatch_SafeRegex:
236 regex := pt.SafeRegex.GetRegex()
237 re, err := regexp.Compile(regex)
238 if err != nil {
239 return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
240 }
241 route.Regex = re
242 default:
243 return nil, nil, fmt.Errorf("route %+v has an unrecognized path specifier: %+v", r, pt)
244 }
245
246 if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil {
247 route.CaseInsensitive = !caseSensitive.Value
248 }
249
250 for _, h := range match.GetHeaders() {
251 var header HeaderMatcher
252 switch ht := h.GetHeaderMatchSpecifier().(type) {
253 case *v3routepb.HeaderMatcher_ExactMatch:
254 header.ExactMatch = &ht.ExactMatch
255 case *v3routepb.HeaderMatcher_SafeRegexMatch:
256 regex := ht.SafeRegexMatch.GetRegex()
257 re, err := regexp.Compile(regex)
258 if err != nil {
259 return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
260 }
261 header.RegexMatch = re
262 case *v3routepb.HeaderMatcher_RangeMatch:
263 header.RangeMatch = &Int64Range{
264 Start: ht.RangeMatch.Start,
265 End: ht.RangeMatch.End,
266 }
267 case *v3routepb.HeaderMatcher_PresentMatch:
268 header.PresentMatch = &ht.PresentMatch
269 case *v3routepb.HeaderMatcher_PrefixMatch:
270 header.PrefixMatch = &ht.PrefixMatch
271 case *v3routepb.HeaderMatcher_SuffixMatch:
272 header.SuffixMatch = &ht.SuffixMatch
273 case *v3routepb.HeaderMatcher_StringMatch:
274 sm, err := matcher.StringMatcherFromProto(ht.StringMatch)
275 if err != nil {
276 return nil, nil, fmt.Errorf("route %+v has an invalid string matcher: %v", err, ht.StringMatch)
277 }
278 header.StringMatch = &sm
279 default:
280 return nil, nil, fmt.Errorf("route %+v has an unrecognized header matcher: %+v", r, ht)
281 }
282 header.Name = h.GetName()
283 invert := h.GetInvertMatch()
284 header.InvertMatch = &invert
285 route.Headers = append(route.Headers, &header)
286 }
287
288 if fr := match.GetRuntimeFraction(); fr != nil {
289 d := fr.GetDefaultValue()
290 n := d.GetNumerator()
291 switch d.GetDenominator() {
292 case v3typepb.FractionalPercent_HUNDRED:
293 n *= 10000
294 case v3typepb.FractionalPercent_TEN_THOUSAND:
295 n *= 100
296 case v3typepb.FractionalPercent_MILLION:
297 }
298 route.Fraction = &n
299 }
300
301 switch r.GetAction().(type) {
302 case *v3routepb.Route_Route:
303 route.WeightedClusters = make(map[string]WeightedCluster)
304 action := r.GetRoute()
305
306
307 hp, err := hashPoliciesProtoToSlice(action.HashPolicy)
308 if err != nil {
309 return nil, nil, err
310 }
311 route.HashPolicies = hp
312
313 switch a := action.GetClusterSpecifier().(type) {
314 case *v3routepb.RouteAction_Cluster:
315 route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
316 case *v3routepb.RouteAction_WeightedClusters:
317 wcs := a.WeightedClusters
318 var totalWeight uint64
319 for _, c := range wcs.Clusters {
320 w := c.GetWeight().GetValue()
321 if w == 0 {
322 continue
323 }
324 totalWeight += uint64(w)
325 if totalWeight > math.MaxUint32 {
326 return nil, nil, fmt.Errorf("xds: total weight of clusters exceeds MaxUint32")
327 }
328 wc := WeightedCluster{Weight: w}
329 cfgs, err := processHTTPFilterOverrides(c.GetTypedPerFilterConfig())
330 if err != nil {
331 return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, a, err)
332 }
333 wc.HTTPFilterConfigOverride = cfgs
334 route.WeightedClusters[c.GetName()] = wc
335 }
336 if totalWeight == 0 {
337 return nil, nil, fmt.Errorf("route %+v, action %+v, has no valid cluster in WeightedCluster action", r, a)
338 }
339 case *v3routepb.RouteAction_ClusterSpecifierPlugin:
340
341
342
343
344
345
346
347
348
349
350
351
352 if _, ok := csps[a.ClusterSpecifierPlugin]; !ok {
353
354
355
356
357 return nil, nil, fmt.Errorf("route %+v, action %+v, specifies a cluster specifier plugin %+v that is not in Route Configuration", r, a, a.ClusterSpecifierPlugin)
358 }
359 if csps[a.ClusterSpecifierPlugin] == nil {
360 logger.Warningf("Ignoring route %+v with optional and unsupported cluster specifier plugin %+v", r, a.ClusterSpecifierPlugin)
361 continue
362 }
363 cspNames[a.ClusterSpecifierPlugin] = true
364 route.ClusterSpecifierPlugin = a.ClusterSpecifierPlugin
365 default:
366 logger.Warningf("Ignoring route %+v with unknown ClusterSpecifier %+v", r, a)
367 continue
368 }
369
370 msd := action.GetMaxStreamDuration()
371
372 dur := msd.GetGrpcTimeoutHeaderMax()
373 if dur == nil {
374 dur = msd.GetMaxStreamDuration()
375 }
376 if dur != nil {
377 d := dur.AsDuration()
378 route.MaxStreamDuration = &d
379 }
380
381 route.RetryConfig, err = generateRetryConfig(action.GetRetryPolicy())
382 if err != nil {
383 return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, action, err)
384 }
385
386 route.ActionType = RouteActionRoute
387
388 case *v3routepb.Route_NonForwardingAction:
389
390 route.ActionType = RouteActionNonForwardingAction
391 default:
392 route.ActionType = RouteActionUnsupported
393 }
394
395 cfgs, err := processHTTPFilterOverrides(r.GetTypedPerFilterConfig())
396 if err != nil {
397 return nil, nil, fmt.Errorf("route %+v: %v", r, err)
398 }
399 route.HTTPFilterConfigOverride = cfgs
400 routesRet = append(routesRet, &route)
401 }
402 return routesRet, cspNames, nil
403 }
404
405 func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy) ([]*HashPolicy, error) {
406 var hashPoliciesRet []*HashPolicy
407 for _, p := range policies {
408 policy := HashPolicy{Terminal: p.Terminal}
409 switch p.GetPolicySpecifier().(type) {
410 case *v3routepb.RouteAction_HashPolicy_Header_:
411 policy.HashPolicyType = HashPolicyTypeHeader
412 policy.HeaderName = p.GetHeader().GetHeaderName()
413 if rr := p.GetHeader().GetRegexRewrite(); rr != nil {
414 regex := rr.GetPattern().GetRegex()
415 re, err := regexp.Compile(regex)
416 if err != nil {
417 return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex)
418 }
419 policy.Regex = re
420 policy.RegexSubstitution = rr.GetSubstitution()
421 }
422 case *v3routepb.RouteAction_HashPolicy_FilterState_:
423 if p.GetFilterState().GetKey() != "io.grpc.channel_id" {
424 logger.Warningf("Ignoring hash policy %+v with invalid key for filter state policy %q", p, p.GetFilterState().GetKey())
425 continue
426 }
427 policy.HashPolicyType = HashPolicyTypeChannelID
428 default:
429 logger.Warningf("Ignoring unsupported hash policy %T", p.GetPolicySpecifier())
430 continue
431 }
432
433 hashPoliciesRet = append(hashPoliciesRet, &policy)
434 }
435 return hashPoliciesRet, nil
436 }
437
View as plain text