...

Source file src/google.golang.org/grpc/xds/internal/resolver/serviceconfig.go

Documentation: google.golang.org/grpc/xds/internal/resolver

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package resolver
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"fmt"
    25  	"math/bits"
    26  	"strings"
    27  	"sync/atomic"
    28  	"time"
    29  
    30  	xxhash "github.com/cespare/xxhash/v2"
    31  	"google.golang.org/grpc/codes"
    32  	"google.golang.org/grpc/internal/grpcrand"
    33  	"google.golang.org/grpc/internal/grpcutil"
    34  	iresolver "google.golang.org/grpc/internal/resolver"
    35  	"google.golang.org/grpc/internal/serviceconfig"
    36  	"google.golang.org/grpc/internal/wrr"
    37  	"google.golang.org/grpc/metadata"
    38  	"google.golang.org/grpc/status"
    39  	"google.golang.org/grpc/xds/internal/balancer/clustermanager"
    40  	"google.golang.org/grpc/xds/internal/balancer/ringhash"
    41  	"google.golang.org/grpc/xds/internal/httpfilter"
    42  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    43  )
    44  
    45  const (
    46  	cdsName                      = "cds_experimental"
    47  	xdsClusterManagerName        = "xds_cluster_manager_experimental"
    48  	clusterPrefix                = "cluster:"
    49  	clusterSpecifierPluginPrefix = "cluster_specifier_plugin:"
    50  )
    51  
    52  type serviceConfig struct {
    53  	LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"`
    54  }
    55  
    56  type balancerConfig []map[string]any
    57  
    58  func newBalancerConfig(name string, config any) balancerConfig {
    59  	return []map[string]any{{name: config}}
    60  }
    61  
    62  type cdsBalancerConfig struct {
    63  	Cluster string `json:"cluster"`
    64  }
    65  
    66  type xdsChildConfig struct {
    67  	ChildPolicy balancerConfig `json:"childPolicy"`
    68  }
    69  
    70  type xdsClusterManagerConfig struct {
    71  	Children map[string]xdsChildConfig `json:"children"`
    72  }
    73  
    74  // serviceConfigJSON produces a service config in JSON format representing all
    75  // the clusters referenced in activeClusters.  This includes clusters with zero
    76  // references, so they must be pruned first.
    77  func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
    78  	// Generate children (all entries in activeClusters).
    79  	children := make(map[string]xdsChildConfig)
    80  	for cluster, ci := range activeClusters {
    81  		children[cluster] = ci.cfg
    82  	}
    83  
    84  	sc := serviceConfig{
    85  		LoadBalancingConfig: newBalancerConfig(
    86  			xdsClusterManagerName, xdsClusterManagerConfig{Children: children},
    87  		),
    88  	}
    89  
    90  	bs, err := json.Marshal(sc)
    91  	if err != nil {
    92  		return nil, fmt.Errorf("failed to marshal json: %v", err)
    93  	}
    94  	return bs, nil
    95  }
    96  
    97  type virtualHost struct {
    98  	// map from filter name to its config
    99  	httpFilterConfigOverride map[string]httpfilter.FilterConfig
   100  	// retry policy present in virtual host
   101  	retryConfig *xdsresource.RetryConfig
   102  }
   103  
   104  // routeCluster holds information about a cluster as referenced by a route.
   105  type routeCluster struct {
   106  	name string
   107  	// map from filter name to its config
   108  	httpFilterConfigOverride map[string]httpfilter.FilterConfig
   109  }
   110  
   111  type route struct {
   112  	m                 *xdsresource.CompositeMatcher // converted from route matchers
   113  	actionType        xdsresource.RouteActionType   // holds route action type
   114  	clusters          wrr.WRR                       // holds *routeCluster entries
   115  	maxStreamDuration time.Duration
   116  	// map from filter name to its config
   117  	httpFilterConfigOverride map[string]httpfilter.FilterConfig
   118  	retryConfig              *xdsresource.RetryConfig
   119  	hashPolicies             []*xdsresource.HashPolicy
   120  }
   121  
   122  func (r route) String() string {
   123  	return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
   124  }
   125  
   126  type configSelector struct {
   127  	r                *xdsResolver
   128  	virtualHost      virtualHost
   129  	routes           []route
   130  	clusters         map[string]*clusterInfo
   131  	httpFilterConfig []xdsresource.HTTPFilter
   132  }
   133  
   134  var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")
   135  var errUnsupportedClientRouteAction = status.Errorf(codes.Unavailable, "matched route does not have a supported route action type")
   136  
   137  func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
   138  	if cs == nil {
   139  		return nil, status.Errorf(codes.Unavailable, "no valid clusters")
   140  	}
   141  	var rt *route
   142  	// Loop through routes in order and select first match.
   143  	for _, r := range cs.routes {
   144  		if r.m.Match(rpcInfo) {
   145  			rt = &r
   146  			break
   147  		}
   148  	}
   149  
   150  	if rt == nil || rt.clusters == nil {
   151  		return nil, errNoMatchedRouteFound
   152  	}
   153  
   154  	if rt.actionType != xdsresource.RouteActionRoute {
   155  		return nil, errUnsupportedClientRouteAction
   156  	}
   157  
   158  	cluster, ok := rt.clusters.Next().(*routeCluster)
   159  	if !ok {
   160  		return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
   161  	}
   162  
   163  	// Add a ref to the selected cluster, as this RPC needs this cluster until
   164  	// it is committed.
   165  	ref := &cs.clusters[cluster.name].refCount
   166  	atomic.AddInt32(ref, 1)
   167  
   168  	interceptor, err := cs.newInterceptor(rt, cluster)
   169  	if err != nil {
   170  		return nil, err
   171  	}
   172  
   173  	lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
   174  	lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))
   175  
   176  	config := &iresolver.RPCConfig{
   177  		// Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy.
   178  		Context: lbCtx,
   179  		OnCommitted: func() {
   180  			// When the RPC is committed, the cluster is no longer required.
   181  			// Decrease its ref.
   182  			if v := atomic.AddInt32(ref, -1); v == 0 {
   183  				// This entry will be removed from activeClusters when
   184  				// producing the service config for the empty update.
   185  				cs.r.serializer.Schedule(func(context.Context) {
   186  					cs.r.onClusterRefDownToZero()
   187  				})
   188  			}
   189  		},
   190  		Interceptor: interceptor,
   191  	}
   192  
   193  	if rt.maxStreamDuration != 0 {
   194  		config.MethodConfig.Timeout = &rt.maxStreamDuration
   195  	}
   196  	if rt.retryConfig != nil {
   197  		config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
   198  	} else if cs.virtualHost.retryConfig != nil {
   199  		config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
   200  	}
   201  
   202  	return config, nil
   203  }
   204  
   205  func retryConfigToPolicy(config *xdsresource.RetryConfig) *serviceconfig.RetryPolicy {
   206  	return &serviceconfig.RetryPolicy{
   207  		MaxAttempts:          int(config.NumRetries) + 1,
   208  		InitialBackoff:       config.RetryBackoff.BaseInterval,
   209  		MaxBackoff:           config.RetryBackoff.MaxInterval,
   210  		BackoffMultiplier:    2,
   211  		RetryableStatusCodes: config.RetryOn,
   212  	}
   213  }
   214  
   215  func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsresource.HashPolicy) uint64 {
   216  	var hash uint64
   217  	var generatedHash bool
   218  	var md, emd metadata.MD
   219  	var mdRead bool
   220  	for _, policy := range hashPolicies {
   221  		var policyHash uint64
   222  		var generatedPolicyHash bool
   223  		switch policy.HashPolicyType {
   224  		case xdsresource.HashPolicyTypeHeader:
   225  			if strings.HasSuffix(policy.HeaderName, "-bin") {
   226  				continue
   227  			}
   228  			if !mdRead {
   229  				md, _ = metadata.FromOutgoingContext(rpcInfo.Context)
   230  				emd, _ = grpcutil.ExtraMetadata(rpcInfo.Context)
   231  				mdRead = true
   232  			}
   233  			values := emd.Get(policy.HeaderName)
   234  			if len(values) == 0 {
   235  				// Extra metadata (e.g. the "content-type" header) takes
   236  				// precedence over the user's metadata.
   237  				values = md.Get(policy.HeaderName)
   238  				if len(values) == 0 {
   239  					// If the header isn't present at all, this policy is a no-op.
   240  					continue
   241  				}
   242  			}
   243  			joinedValues := strings.Join(values, ",")
   244  			if policy.Regex != nil {
   245  				joinedValues = policy.Regex.ReplaceAllString(joinedValues, policy.RegexSubstitution)
   246  			}
   247  			policyHash = xxhash.Sum64String(joinedValues)
   248  			generatedHash = true
   249  			generatedPolicyHash = true
   250  		case xdsresource.HashPolicyTypeChannelID:
   251  			// Use the static channel ID as the hash for this policy.
   252  			policyHash = cs.r.channelID
   253  			generatedHash = true
   254  			generatedPolicyHash = true
   255  		}
   256  
   257  		// Deterministically combine the hash policies. Rotating prevents
   258  		// duplicate hash policies from cancelling each other out and preserves
   259  		// the 64 bits of entropy.
   260  		if generatedPolicyHash {
   261  			hash = bits.RotateLeft64(hash, 1)
   262  			hash = hash ^ policyHash
   263  		}
   264  
   265  		// If terminal policy and a hash has already been generated, ignore the
   266  		// rest of the policies and use that hash already generated.
   267  		if policy.Terminal && generatedHash {
   268  			break
   269  		}
   270  	}
   271  
   272  	if generatedHash {
   273  		return hash
   274  	}
   275  	// If no generated hash return a random long. In the grand scheme of things
   276  	// this logically will map to choosing a random backend to route request to.
   277  	return grpcrand.Uint64()
   278  }
   279  
   280  func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
   281  	if len(cs.httpFilterConfig) == 0 {
   282  		return nil, nil
   283  	}
   284  	interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
   285  	for _, filter := range cs.httpFilterConfig {
   286  		override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
   287  		if override == nil {
   288  			override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
   289  		}
   290  		if override == nil {
   291  			override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
   292  		}
   293  		ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
   294  		if !ok {
   295  			// Should not happen if it passed xdsClient validation.
   296  			return nil, fmt.Errorf("filter does not support use in client")
   297  		}
   298  		i, err := ib.BuildClientInterceptor(filter.Config, override)
   299  		if err != nil {
   300  			return nil, fmt.Errorf("error constructing filter: %v", err)
   301  		}
   302  		if i != nil {
   303  			interceptors = append(interceptors, i)
   304  		}
   305  	}
   306  	return &interceptorList{interceptors: interceptors}, nil
   307  }
   308  
   309  // stop decrements refs of all clusters referenced by this config selector.
   310  func (cs *configSelector) stop() {
   311  	// The resolver's old configSelector may be nil.  Handle that here.
   312  	if cs == nil {
   313  		return
   314  	}
   315  	// If any refs drop to zero, we'll need a service config update to delete
   316  	// the cluster.
   317  	needUpdate := false
   318  	// Loops over cs.clusters, but these are pointers to entries in
   319  	// activeClusters.
   320  	for _, ci := range cs.clusters {
   321  		if v := atomic.AddInt32(&ci.refCount, -1); v == 0 {
   322  			needUpdate = true
   323  		}
   324  	}
   325  	// We stop the old config selector immediately after sending a new config
   326  	// selector; we need another update to delete clusters from the config (if
   327  	// we don't have another update pending already).
   328  	if needUpdate {
   329  		cs.r.serializer.Schedule(func(context.Context) {
   330  			cs.r.onClusterRefDownToZero()
   331  		})
   332  	}
   333  }
   334  
   335  type interceptorList struct {
   336  	interceptors []iresolver.ClientInterceptor
   337  }
   338  
   339  func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
   340  	for i := len(il.interceptors) - 1; i >= 0; i-- {
   341  		ns := newStream
   342  		interceptor := il.interceptors[i]
   343  		newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
   344  			return interceptor.NewStream(ctx, ri, done, ns)
   345  		}
   346  	}
   347  	return newStream(ctx, func() {})
   348  }
   349  

View as plain text