...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_rds.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient/xdsresource

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   */
    17  
    18  package xdsresource
    19  
    20  import (
    21  	"fmt"
    22  	"math"
    23  	"regexp"
    24  	"strings"
    25  	"time"
    26  
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/internal/xds/matcher"
    29  	"google.golang.org/grpc/xds/internal/clusterspecifier"
    30  	"google.golang.org/protobuf/proto"
    31  	"google.golang.org/protobuf/types/known/anypb"
    32  
    33  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    34  	v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
    35  )
    36  
    37  func unmarshalRouteConfigResource(r *anypb.Any) (string, RouteConfigUpdate, error) {
    38  	r, err := UnwrapResource(r)
    39  	if err != nil {
    40  		return "", RouteConfigUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
    41  	}
    42  
    43  	if !IsRouteConfigResource(r.GetTypeUrl()) {
    44  		return "", RouteConfigUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
    45  	}
    46  	rc := &v3routepb.RouteConfiguration{}
    47  	if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
    48  		return "", RouteConfigUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
    49  	}
    50  
    51  	u, err := generateRDSUpdateFromRouteConfiguration(rc)
    52  	if err != nil {
    53  		return rc.GetName(), RouteConfigUpdate{}, err
    54  	}
    55  	u.Raw = r
    56  	return rc.GetName(), u, nil
    57  }
    58  
    59  // generateRDSUpdateFromRouteConfiguration checks if the provided
    60  // RouteConfiguration meets the expected criteria. If so, it returns a
    61  // RouteConfigUpdate with nil error.
    62  //
    63  // A RouteConfiguration resource is considered valid when only if it contains a
    64  // VirtualHost whose domain field matches the server name from the URI passed
    65  // to the gRPC channel, and it contains a clusterName or a weighted cluster.
    66  //
    67  // The RouteConfiguration includes a list of virtualHosts, which may have zero
    68  // or more elements. We are interested in the element whose domains field
    69  // matches the server name specified in the "xds:" URI. The only field in the
    70  // VirtualHost proto that the we are interested in is the list of routes. We
    71  // only look at the last route in the list (the default route), whose match
    72  // field must be empty and whose route field must be set.  Inside that route
    73  // message, the cluster field will contain the clusterName or weighted clusters
    74  // we are looking for.
    75  func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration) (RouteConfigUpdate, error) {
    76  	vhs := make([]*VirtualHost, 0, len(rc.GetVirtualHosts()))
    77  	csps, err := processClusterSpecifierPlugins(rc.ClusterSpecifierPlugins)
    78  	if err != nil {
    79  		return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
    80  	}
    81  	// cspNames represents all the cluster specifiers referenced by Route
    82  	// Actions - any cluster specifiers not referenced by a Route Action can be
    83  	// ignored and not emitted by the xdsclient.
    84  	var cspNames = make(map[string]bool)
    85  	for _, vh := range rc.GetVirtualHosts() {
    86  		routes, cspNs, err := routesProtoToSlice(vh.Routes, csps)
    87  		if err != nil {
    88  			return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
    89  		}
    90  		for n := range cspNs {
    91  			cspNames[n] = true
    92  		}
    93  		rc, err := generateRetryConfig(vh.GetRetryPolicy())
    94  		if err != nil {
    95  			return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
    96  		}
    97  		vhOut := &VirtualHost{
    98  			Domains:     vh.GetDomains(),
    99  			Routes:      routes,
   100  			RetryConfig: rc,
   101  		}
   102  		cfgs, err := processHTTPFilterOverrides(vh.GetTypedPerFilterConfig())
   103  		if err != nil {
   104  			return RouteConfigUpdate{}, fmt.Errorf("virtual host %+v: %v", vh, err)
   105  		}
   106  		vhOut.HTTPFilterConfigOverride = cfgs
   107  		vhs = append(vhs, vhOut)
   108  	}
   109  
   110  	// "For any entry in the RouteConfiguration.cluster_specifier_plugins not
   111  	// referenced by an enclosed ActionType's cluster_specifier_plugin, the xDS
   112  	// client should not provide it to its consumers." - RLS in xDS Design
   113  	for name := range csps {
   114  		if !cspNames[name] {
   115  			delete(csps, name)
   116  		}
   117  	}
   118  
   119  	return RouteConfigUpdate{VirtualHosts: vhs, ClusterSpecifierPlugins: csps}, nil
   120  }
   121  
   122  func processClusterSpecifierPlugins(csps []*v3routepb.ClusterSpecifierPlugin) (map[string]clusterspecifier.BalancerConfig, error) {
   123  	cspCfgs := make(map[string]clusterspecifier.BalancerConfig)
   124  	// "The xDS client will inspect all elements of the
   125  	// cluster_specifier_plugins field looking up a plugin based on the
   126  	// extension.typed_config of each." - RLS in xDS design
   127  	for _, csp := range csps {
   128  		cs := clusterspecifier.Get(csp.GetExtension().GetTypedConfig().GetTypeUrl())
   129  		if cs == nil {
   130  			if csp.GetIsOptional() {
   131  				// "If a plugin is not supported but has is_optional set, then
   132  				// we will ignore any routes that point to that plugin"
   133  				cspCfgs[csp.GetExtension().GetName()] = nil
   134  				continue
   135  			}
   136  			// "If no plugin is registered for it, the resource will be NACKed."
   137  			// - RLS in xDS design
   138  			return nil, fmt.Errorf("cluster specifier %q of type %q was not found", csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl())
   139  		}
   140  		lbCfg, err := cs.ParseClusterSpecifierConfig(csp.GetExtension().GetTypedConfig())
   141  		if err != nil {
   142  			// "If a plugin is found, the value of the typed_config field will
   143  			// be passed to it's conversion method, and if an error is
   144  			// encountered, the resource will be NACKED." - RLS in xDS design
   145  			return nil, fmt.Errorf("error: %q parsing config %q for cluster specifier %q of type %q", err, csp.GetExtension().GetTypedConfig(), csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl())
   146  		}
   147  		// "If all cluster specifiers are valid, the xDS client will store the
   148  		// configurations in a map keyed by the name of the extension instance." -
   149  		// RLS in xDS Design
   150  		cspCfgs[csp.GetExtension().GetName()] = lbCfg
   151  	}
   152  	return cspCfgs, nil
   153  }
   154  
   155  func generateRetryConfig(rp *v3routepb.RetryPolicy) (*RetryConfig, error) {
   156  	if rp == nil {
   157  		return nil, nil
   158  	}
   159  
   160  	cfg := &RetryConfig{RetryOn: make(map[codes.Code]bool)}
   161  	for _, s := range strings.Split(rp.GetRetryOn(), ",") {
   162  		switch strings.TrimSpace(strings.ToLower(s)) {
   163  		case "cancelled":
   164  			cfg.RetryOn[codes.Canceled] = true
   165  		case "deadline-exceeded":
   166  			cfg.RetryOn[codes.DeadlineExceeded] = true
   167  		case "internal":
   168  			cfg.RetryOn[codes.Internal] = true
   169  		case "resource-exhausted":
   170  			cfg.RetryOn[codes.ResourceExhausted] = true
   171  		case "unavailable":
   172  			cfg.RetryOn[codes.Unavailable] = true
   173  		}
   174  	}
   175  
   176  	if rp.NumRetries == nil {
   177  		cfg.NumRetries = 1
   178  	} else {
   179  		cfg.NumRetries = rp.GetNumRetries().Value
   180  		if cfg.NumRetries < 1 {
   181  			return nil, fmt.Errorf("retry_policy.num_retries = %v; must be >= 1", cfg.NumRetries)
   182  		}
   183  	}
   184  
   185  	backoff := rp.GetRetryBackOff()
   186  	if backoff == nil {
   187  		cfg.RetryBackoff.BaseInterval = 25 * time.Millisecond
   188  	} else {
   189  		cfg.RetryBackoff.BaseInterval = backoff.GetBaseInterval().AsDuration()
   190  		if cfg.RetryBackoff.BaseInterval <= 0 {
   191  			return nil, fmt.Errorf("retry_policy.base_interval = %v; must be > 0", cfg.RetryBackoff.BaseInterval)
   192  		}
   193  	}
   194  	if max := backoff.GetMaxInterval(); max == nil {
   195  		cfg.RetryBackoff.MaxInterval = 10 * cfg.RetryBackoff.BaseInterval
   196  	} else {
   197  		cfg.RetryBackoff.MaxInterval = max.AsDuration()
   198  		if cfg.RetryBackoff.MaxInterval <= 0 {
   199  			return nil, fmt.Errorf("retry_policy.max_interval = %v; must be > 0", cfg.RetryBackoff.MaxInterval)
   200  		}
   201  	}
   202  
   203  	if len(cfg.RetryOn) == 0 {
   204  		return &RetryConfig{}, nil
   205  	}
   206  	return cfg, nil
   207  }
   208  
   209  func routesProtoToSlice(routes []*v3routepb.Route, csps map[string]clusterspecifier.BalancerConfig) ([]*Route, map[string]bool, error) {
   210  	var routesRet []*Route
   211  	var cspNames = make(map[string]bool)
   212  	for _, r := range routes {
   213  		match := r.GetMatch()
   214  		if match == nil {
   215  			return nil, nil, fmt.Errorf("route %+v doesn't have a match", r)
   216  		}
   217  
   218  		if len(match.GetQueryParameters()) != 0 {
   219  			// Ignore route with query parameters.
   220  			logger.Warningf("Ignoring route %+v with query parameter matchers", r)
   221  			continue
   222  		}
   223  
   224  		pathSp := match.GetPathSpecifier()
   225  		if pathSp == nil {
   226  			return nil, nil, fmt.Errorf("route %+v doesn't have a path specifier", r)
   227  		}
   228  
   229  		var route Route
   230  		switch pt := pathSp.(type) {
   231  		case *v3routepb.RouteMatch_Prefix:
   232  			route.Prefix = &pt.Prefix
   233  		case *v3routepb.RouteMatch_Path:
   234  			route.Path = &pt.Path
   235  		case *v3routepb.RouteMatch_SafeRegex:
   236  			regex := pt.SafeRegex.GetRegex()
   237  			re, err := regexp.Compile(regex)
   238  			if err != nil {
   239  				return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
   240  			}
   241  			route.Regex = re
   242  		default:
   243  			return nil, nil, fmt.Errorf("route %+v has an unrecognized path specifier: %+v", r, pt)
   244  		}
   245  
   246  		if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil {
   247  			route.CaseInsensitive = !caseSensitive.Value
   248  		}
   249  
   250  		for _, h := range match.GetHeaders() {
   251  			var header HeaderMatcher
   252  			switch ht := h.GetHeaderMatchSpecifier().(type) {
   253  			case *v3routepb.HeaderMatcher_ExactMatch:
   254  				header.ExactMatch = &ht.ExactMatch
   255  			case *v3routepb.HeaderMatcher_SafeRegexMatch:
   256  				regex := ht.SafeRegexMatch.GetRegex()
   257  				re, err := regexp.Compile(regex)
   258  				if err != nil {
   259  					return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
   260  				}
   261  				header.RegexMatch = re
   262  			case *v3routepb.HeaderMatcher_RangeMatch:
   263  				header.RangeMatch = &Int64Range{
   264  					Start: ht.RangeMatch.Start,
   265  					End:   ht.RangeMatch.End,
   266  				}
   267  			case *v3routepb.HeaderMatcher_PresentMatch:
   268  				header.PresentMatch = &ht.PresentMatch
   269  			case *v3routepb.HeaderMatcher_PrefixMatch:
   270  				header.PrefixMatch = &ht.PrefixMatch
   271  			case *v3routepb.HeaderMatcher_SuffixMatch:
   272  				header.SuffixMatch = &ht.SuffixMatch
   273  			case *v3routepb.HeaderMatcher_StringMatch:
   274  				sm, err := matcher.StringMatcherFromProto(ht.StringMatch)
   275  				if err != nil {
   276  					return nil, nil, fmt.Errorf("route %+v has an invalid string matcher: %v", err, ht.StringMatch)
   277  				}
   278  				header.StringMatch = &sm
   279  			default:
   280  				return nil, nil, fmt.Errorf("route %+v has an unrecognized header matcher: %+v", r, ht)
   281  			}
   282  			header.Name = h.GetName()
   283  			invert := h.GetInvertMatch()
   284  			header.InvertMatch = &invert
   285  			route.Headers = append(route.Headers, &header)
   286  		}
   287  
   288  		if fr := match.GetRuntimeFraction(); fr != nil {
   289  			d := fr.GetDefaultValue()
   290  			n := d.GetNumerator()
   291  			switch d.GetDenominator() {
   292  			case v3typepb.FractionalPercent_HUNDRED:
   293  				n *= 10000
   294  			case v3typepb.FractionalPercent_TEN_THOUSAND:
   295  				n *= 100
   296  			case v3typepb.FractionalPercent_MILLION:
   297  			}
   298  			route.Fraction = &n
   299  		}
   300  
   301  		switch r.GetAction().(type) {
   302  		case *v3routepb.Route_Route:
   303  			route.WeightedClusters = make(map[string]WeightedCluster)
   304  			action := r.GetRoute()
   305  
   306  			// Hash Policies are only applicable for a Ring Hash LB.
   307  			hp, err := hashPoliciesProtoToSlice(action.HashPolicy)
   308  			if err != nil {
   309  				return nil, nil, err
   310  			}
   311  			route.HashPolicies = hp
   312  
   313  			switch a := action.GetClusterSpecifier().(type) {
   314  			case *v3routepb.RouteAction_Cluster:
   315  				route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
   316  			case *v3routepb.RouteAction_WeightedClusters:
   317  				wcs := a.WeightedClusters
   318  				var totalWeight uint64
   319  				for _, c := range wcs.Clusters {
   320  					w := c.GetWeight().GetValue()
   321  					if w == 0 {
   322  						continue
   323  					}
   324  					totalWeight += uint64(w)
   325  					if totalWeight > math.MaxUint32 {
   326  						return nil, nil, fmt.Errorf("xds: total weight of clusters exceeds MaxUint32")
   327  					}
   328  					wc := WeightedCluster{Weight: w}
   329  					cfgs, err := processHTTPFilterOverrides(c.GetTypedPerFilterConfig())
   330  					if err != nil {
   331  						return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, a, err)
   332  					}
   333  					wc.HTTPFilterConfigOverride = cfgs
   334  					route.WeightedClusters[c.GetName()] = wc
   335  				}
   336  				if totalWeight == 0 {
   337  					return nil, nil, fmt.Errorf("route %+v, action %+v, has no valid cluster in WeightedCluster action", r, a)
   338  				}
   339  			case *v3routepb.RouteAction_ClusterSpecifierPlugin:
   340  				// gRFC A28 was updated to say the following:
   341  				//
   342  				// The route’s action field must be route, and its
   343  				// cluster_specifier:
   344  				// - Can be Cluster
   345  				// - Can be Weighted_clusters
   346  				// - Can be unset or an unsupported field. The route containing
   347  				//   this action will be ignored.
   348  				//
   349  				// This means that if this env var is not set, we should treat
   350  				// it as if it we didn't know about the cluster_specifier_plugin
   351  				// at all.
   352  				if _, ok := csps[a.ClusterSpecifierPlugin]; !ok {
   353  					// "When processing RouteActions, if any action includes a
   354  					// cluster_specifier_plugin value that is not in
   355  					// RouteConfiguration.cluster_specifier_plugins, the
   356  					// resource will be NACKed." - RLS in xDS design
   357  					return nil, nil, fmt.Errorf("route %+v, action %+v, specifies a cluster specifier plugin %+v that is not in Route Configuration", r, a, a.ClusterSpecifierPlugin)
   358  				}
   359  				if csps[a.ClusterSpecifierPlugin] == nil {
   360  					logger.Warningf("Ignoring route %+v with optional and unsupported cluster specifier plugin %+v", r, a.ClusterSpecifierPlugin)
   361  					continue
   362  				}
   363  				cspNames[a.ClusterSpecifierPlugin] = true
   364  				route.ClusterSpecifierPlugin = a.ClusterSpecifierPlugin
   365  			default:
   366  				logger.Warningf("Ignoring route %+v with unknown ClusterSpecifier %+v", r, a)
   367  				continue
   368  			}
   369  
   370  			msd := action.GetMaxStreamDuration()
   371  			// Prefer grpc_timeout_header_max, if set.
   372  			dur := msd.GetGrpcTimeoutHeaderMax()
   373  			if dur == nil {
   374  				dur = msd.GetMaxStreamDuration()
   375  			}
   376  			if dur != nil {
   377  				d := dur.AsDuration()
   378  				route.MaxStreamDuration = &d
   379  			}
   380  
   381  			route.RetryConfig, err = generateRetryConfig(action.GetRetryPolicy())
   382  			if err != nil {
   383  				return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, action, err)
   384  			}
   385  
   386  			route.ActionType = RouteActionRoute
   387  
   388  		case *v3routepb.Route_NonForwardingAction:
   389  			// Expected to be used on server side.
   390  			route.ActionType = RouteActionNonForwardingAction
   391  		default:
   392  			route.ActionType = RouteActionUnsupported
   393  		}
   394  
   395  		cfgs, err := processHTTPFilterOverrides(r.GetTypedPerFilterConfig())
   396  		if err != nil {
   397  			return nil, nil, fmt.Errorf("route %+v: %v", r, err)
   398  		}
   399  		route.HTTPFilterConfigOverride = cfgs
   400  		routesRet = append(routesRet, &route)
   401  	}
   402  	return routesRet, cspNames, nil
   403  }
   404  
   405  func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy) ([]*HashPolicy, error) {
   406  	var hashPoliciesRet []*HashPolicy
   407  	for _, p := range policies {
   408  		policy := HashPolicy{Terminal: p.Terminal}
   409  		switch p.GetPolicySpecifier().(type) {
   410  		case *v3routepb.RouteAction_HashPolicy_Header_:
   411  			policy.HashPolicyType = HashPolicyTypeHeader
   412  			policy.HeaderName = p.GetHeader().GetHeaderName()
   413  			if rr := p.GetHeader().GetRegexRewrite(); rr != nil {
   414  				regex := rr.GetPattern().GetRegex()
   415  				re, err := regexp.Compile(regex)
   416  				if err != nil {
   417  					return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex)
   418  				}
   419  				policy.Regex = re
   420  				policy.RegexSubstitution = rr.GetSubstitution()
   421  			}
   422  		case *v3routepb.RouteAction_HashPolicy_FilterState_:
   423  			if p.GetFilterState().GetKey() != "io.grpc.channel_id" {
   424  				logger.Warningf("Ignoring hash policy %+v with invalid key for filter state policy %q", p, p.GetFilterState().GetKey())
   425  				continue
   426  			}
   427  			policy.HashPolicyType = HashPolicyTypeChannelID
   428  		default:
   429  			logger.Warningf("Ignoring unsupported hash policy %T", p.GetPolicySpecifier())
   430  			continue
   431  		}
   432  
   433  		hashPoliciesRet = append(hashPoliciesRet, &policy)
   434  	}
   435  	return hashPoliciesRet, nil
   436  }
   437  

View as plain text