...

Source file src/google.golang.org/grpc/service_config.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2017 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc
    20  
    21  import (
    22  	"encoding/json"
    23  	"errors"
    24  	"fmt"
    25  	"reflect"
    26  	"time"
    27  
    28  	"google.golang.org/grpc/balancer"
    29  	"google.golang.org/grpc/codes"
    30  	"google.golang.org/grpc/internal"
    31  	"google.golang.org/grpc/internal/balancer/gracefulswitch"
    32  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    33  	"google.golang.org/grpc/serviceconfig"
    34  )
    35  
    36  const maxInt = int(^uint(0) >> 1)
    37  
    38  // MethodConfig defines the configuration recommended by the service providers for a
    39  // particular method.
    40  //
    41  // Deprecated: Users should not use this struct. Service config should be received
    42  // through name resolver, as specified here
    43  // https://github.com/grpc/grpc/blob/master/doc/service_config.md
    44  type MethodConfig = internalserviceconfig.MethodConfig
    45  
    46  // ServiceConfig is provided by the service provider and contains parameters for how
    47  // clients that connect to the service should behave.
    48  //
    49  // Deprecated: Users should not use this struct. Service config should be received
    50  // through name resolver, as specified here
    51  // https://github.com/grpc/grpc/blob/master/doc/service_config.md
    52  type ServiceConfig struct {
    53  	serviceconfig.Config
    54  
    55  	// lbConfig is the service config's load balancing configuration.  If
    56  	// lbConfig and LB are both present, lbConfig will be used.
    57  	lbConfig serviceconfig.LoadBalancingConfig
    58  
    59  	// Methods contains a map for the methods in this service.  If there is an
    60  	// exact match for a method (i.e. /service/method) in the map, use the
    61  	// corresponding MethodConfig.  If there's no exact match, look for the
    62  	// default config for the service (/service/) and use the corresponding
    63  	// MethodConfig if it exists.  Otherwise, the method has no MethodConfig to
    64  	// use.
    65  	Methods map[string]MethodConfig
    66  
    67  	// If a retryThrottlingPolicy is provided, gRPC will automatically throttle
    68  	// retry attempts and hedged RPCs when the client’s ratio of failures to
    69  	// successes exceeds a threshold.
    70  	//
    71  	// For each server name, the gRPC client will maintain a token_count which is
    72  	// initially set to maxTokens, and can take values between 0 and maxTokens.
    73  	//
    74  	// Every outgoing RPC (regardless of service or method invoked) will change
    75  	// token_count as follows:
    76  	//
    77  	//   - Every failed RPC will decrement the token_count by 1.
    78  	//   - Every successful RPC will increment the token_count by tokenRatio.
    79  	//
    80  	// If token_count is less than or equal to maxTokens / 2, then RPCs will not
    81  	// be retried and hedged RPCs will not be sent.
    82  	retryThrottling *retryThrottlingPolicy
    83  	// healthCheckConfig must be set as one of the requirement to enable LB channel
    84  	// health check.
    85  	healthCheckConfig *healthCheckConfig
    86  	// rawJSONString stores service config json string that get parsed into
    87  	// this service config struct.
    88  	rawJSONString string
    89  }
    90  
    91  // healthCheckConfig defines the go-native version of the LB channel health check config.
    92  type healthCheckConfig struct {
    93  	// serviceName is the service name to use in the health-checking request.
    94  	ServiceName string
    95  }
    96  
    97  type jsonRetryPolicy struct {
    98  	MaxAttempts          int
    99  	InitialBackoff       internalserviceconfig.Duration
   100  	MaxBackoff           internalserviceconfig.Duration
   101  	BackoffMultiplier    float64
   102  	RetryableStatusCodes []codes.Code
   103  }
   104  
   105  // retryThrottlingPolicy defines the go-native version of the retry throttling
   106  // policy defined by the service config here:
   107  // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
   108  type retryThrottlingPolicy struct {
   109  	// The number of tokens starts at maxTokens. The token_count will always be
   110  	// between 0 and maxTokens.
   111  	//
   112  	// This field is required and must be greater than zero.
   113  	MaxTokens float64
   114  	// The amount of tokens to add on each successful RPC. Typically this will
   115  	// be some number between 0 and 1, e.g., 0.1.
   116  	//
   117  	// This field is required and must be greater than zero. Up to 3 decimal
   118  	// places are supported.
   119  	TokenRatio float64
   120  }
   121  
   122  type jsonName struct {
   123  	Service string
   124  	Method  string
   125  }
   126  
   127  var (
   128  	errDuplicatedName             = errors.New("duplicated name")
   129  	errEmptyServiceNonEmptyMethod = errors.New("cannot combine empty 'service' and non-empty 'method'")
   130  )
   131  
   132  func (j jsonName) generatePath() (string, error) {
   133  	if j.Service == "" {
   134  		if j.Method != "" {
   135  			return "", errEmptyServiceNonEmptyMethod
   136  		}
   137  		return "", nil
   138  	}
   139  	res := "/" + j.Service + "/"
   140  	if j.Method != "" {
   141  		res += j.Method
   142  	}
   143  	return res, nil
   144  }
   145  
   146  // TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
   147  type jsonMC struct {
   148  	Name                    *[]jsonName
   149  	WaitForReady            *bool
   150  	Timeout                 *internalserviceconfig.Duration
   151  	MaxRequestMessageBytes  *int64
   152  	MaxResponseMessageBytes *int64
   153  	RetryPolicy             *jsonRetryPolicy
   154  }
   155  
   156  // TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
   157  type jsonSC struct {
   158  	LoadBalancingPolicy *string
   159  	LoadBalancingConfig *json.RawMessage
   160  	MethodConfig        *[]jsonMC
   161  	RetryThrottling     *retryThrottlingPolicy
   162  	HealthCheckConfig   *healthCheckConfig
   163  }
   164  
   165  func init() {
   166  	internal.ParseServiceConfig = parseServiceConfig
   167  }
   168  func parseServiceConfig(js string) *serviceconfig.ParseResult {
   169  	if len(js) == 0 {
   170  		return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
   171  	}
   172  	var rsc jsonSC
   173  	err := json.Unmarshal([]byte(js), &rsc)
   174  	if err != nil {
   175  		logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
   176  		return &serviceconfig.ParseResult{Err: err}
   177  	}
   178  	sc := ServiceConfig{
   179  		Methods:           make(map[string]MethodConfig),
   180  		retryThrottling:   rsc.RetryThrottling,
   181  		healthCheckConfig: rsc.HealthCheckConfig,
   182  		rawJSONString:     js,
   183  	}
   184  	c := rsc.LoadBalancingConfig
   185  	if c == nil {
   186  		name := PickFirstBalancerName
   187  		if rsc.LoadBalancingPolicy != nil {
   188  			name = *rsc.LoadBalancingPolicy
   189  		}
   190  		if balancer.Get(name) == nil {
   191  			name = PickFirstBalancerName
   192  		}
   193  		cfg := []map[string]any{{name: struct{}{}}}
   194  		strCfg, err := json.Marshal(cfg)
   195  		if err != nil {
   196  			return &serviceconfig.ParseResult{Err: fmt.Errorf("unexpected error marshaling simple LB config: %w", err)}
   197  		}
   198  		r := json.RawMessage(strCfg)
   199  		c = &r
   200  	}
   201  	cfg, err := gracefulswitch.ParseConfig(*c)
   202  	if err != nil {
   203  		return &serviceconfig.ParseResult{Err: err}
   204  	}
   205  	sc.lbConfig = cfg
   206  
   207  	if rsc.MethodConfig == nil {
   208  		return &serviceconfig.ParseResult{Config: &sc}
   209  	}
   210  
   211  	paths := map[string]struct{}{}
   212  	for _, m := range *rsc.MethodConfig {
   213  		if m.Name == nil {
   214  			continue
   215  		}
   216  
   217  		mc := MethodConfig{
   218  			WaitForReady: m.WaitForReady,
   219  			Timeout:      (*time.Duration)(m.Timeout),
   220  		}
   221  		if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
   222  			logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
   223  			return &serviceconfig.ParseResult{Err: err}
   224  		}
   225  		if m.MaxRequestMessageBytes != nil {
   226  			if *m.MaxRequestMessageBytes > int64(maxInt) {
   227  				mc.MaxReqSize = newInt(maxInt)
   228  			} else {
   229  				mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes))
   230  			}
   231  		}
   232  		if m.MaxResponseMessageBytes != nil {
   233  			if *m.MaxResponseMessageBytes > int64(maxInt) {
   234  				mc.MaxRespSize = newInt(maxInt)
   235  			} else {
   236  				mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes))
   237  			}
   238  		}
   239  		for i, n := range *m.Name {
   240  			path, err := n.generatePath()
   241  			if err != nil {
   242  				logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
   243  				return &serviceconfig.ParseResult{Err: err}
   244  			}
   245  
   246  			if _, ok := paths[path]; ok {
   247  				err = errDuplicatedName
   248  				logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
   249  				return &serviceconfig.ParseResult{Err: err}
   250  			}
   251  			paths[path] = struct{}{}
   252  			sc.Methods[path] = mc
   253  		}
   254  	}
   255  
   256  	if sc.retryThrottling != nil {
   257  		if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
   258  			return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)}
   259  		}
   260  		if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
   261  			return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)}
   262  		}
   263  	}
   264  	return &serviceconfig.ParseResult{Config: &sc}
   265  }
   266  
   267  func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPolicy, err error) {
   268  	if jrp == nil {
   269  		return nil, nil
   270  	}
   271  
   272  	if jrp.MaxAttempts <= 1 ||
   273  		jrp.InitialBackoff <= 0 ||
   274  		jrp.MaxBackoff <= 0 ||
   275  		jrp.BackoffMultiplier <= 0 ||
   276  		len(jrp.RetryableStatusCodes) == 0 {
   277  		logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
   278  		return nil, nil
   279  	}
   280  
   281  	rp := &internalserviceconfig.RetryPolicy{
   282  		MaxAttempts:          jrp.MaxAttempts,
   283  		InitialBackoff:       time.Duration(jrp.InitialBackoff),
   284  		MaxBackoff:           time.Duration(jrp.MaxBackoff),
   285  		BackoffMultiplier:    jrp.BackoffMultiplier,
   286  		RetryableStatusCodes: make(map[codes.Code]bool),
   287  	}
   288  	if rp.MaxAttempts > 5 {
   289  		// TODO(retry): Make the max maxAttempts configurable.
   290  		rp.MaxAttempts = 5
   291  	}
   292  	for _, code := range jrp.RetryableStatusCodes {
   293  		rp.RetryableStatusCodes[code] = true
   294  	}
   295  	return rp, nil
   296  }
   297  
   298  func min(a, b *int) *int {
   299  	if *a < *b {
   300  		return a
   301  	}
   302  	return b
   303  }
   304  
   305  func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
   306  	if mcMax == nil && doptMax == nil {
   307  		return &defaultVal
   308  	}
   309  	if mcMax != nil && doptMax != nil {
   310  		return min(mcMax, doptMax)
   311  	}
   312  	if mcMax != nil {
   313  		return mcMax
   314  	}
   315  	return doptMax
   316  }
   317  
   318  func newInt(b int) *int {
   319  	return &b
   320  }
   321  
   322  func init() {
   323  	internal.EqualServiceConfigForTesting = equalServiceConfig
   324  }
   325  
   326  // equalServiceConfig compares two configs. The rawJSONString field is ignored,
   327  // because they may diff in white spaces.
   328  //
   329  // If any of them is NOT *ServiceConfig, return false.
   330  func equalServiceConfig(a, b serviceconfig.Config) bool {
   331  	if a == nil && b == nil {
   332  		return true
   333  	}
   334  	aa, ok := a.(*ServiceConfig)
   335  	if !ok {
   336  		return false
   337  	}
   338  	bb, ok := b.(*ServiceConfig)
   339  	if !ok {
   340  		return false
   341  	}
   342  	aaRaw := aa.rawJSONString
   343  	aa.rawJSONString = ""
   344  	bbRaw := bb.rawJSONString
   345  	bb.rawJSONString = ""
   346  	defer func() {
   347  		aa.rawJSONString = aaRaw
   348  		bb.rawJSONString = bbRaw
   349  	}()
   350  	// Using reflect.DeepEqual instead of cmp.Equal because many balancer
   351  	// configs are unexported, and cmp.Equal cannot compare unexported fields
   352  	// from unexported structs.
   353  	return reflect.DeepEqual(aa, bb)
   354  }
   355  

View as plain text