...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry/converter/converter.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry/converter

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package converter provides converters to convert proto load balancing
    20  // configuration, defined by the xDS API spec, to JSON load balancing
    21  // configuration. These converters are registered by proto type in a registry,
    22  // which gets pulled from based off proto type passed in.
    23  package converter
    24  
    25  import (
    26  	"encoding/json"
    27  	"fmt"
    28  	"strings"
    29  
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/balancer"
    32  	"google.golang.org/grpc/balancer/leastrequest"
    33  	"google.golang.org/grpc/balancer/roundrobin"
    34  	"google.golang.org/grpc/balancer/weightedroundrobin"
    35  	"google.golang.org/grpc/internal/envconfig"
    36  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    37  	"google.golang.org/grpc/xds/internal/balancer/ringhash"
    38  	"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
    39  	"google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry"
    40  	"google.golang.org/protobuf/proto"
    41  	"google.golang.org/protobuf/types/known/structpb"
    42  
    43  	v1xdsudpatypepb "github.com/cncf/xds/go/udpa/type/v1"
    44  	v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
    45  	v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
    46  	v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
    47  	v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3"
    48  	v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
    49  	v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
    50  )
    51  
    52  func init() {
    53  	xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin", convertWeightedRoundRobinProtoToServiceConfig)
    54  	xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash", convertRingHashProtoToServiceConfig)
    55  	xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst", convertPickFirstProtoToServiceConfig)
    56  	xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin", convertRoundRobinProtoToServiceConfig)
    57  	xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality", convertWRRLocalityProtoToServiceConfig)
    58  	xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest", convertLeastRequestProtoToServiceConfig)
    59  	xdslbregistry.Register("type.googleapis.com/udpa.type.v1.TypedStruct", convertV1TypedStructToServiceConfig)
    60  	xdslbregistry.Register("type.googleapis.com/xds.type.v3.TypedStruct", convertV3TypedStructToServiceConfig)
    61  }
    62  
    63  const (
    64  	defaultRingHashMinSize         = 1024
    65  	defaultRingHashMaxSize         = 8 * 1024 * 1024 // 8M
    66  	defaultLeastRequestChoiceCount = 2
    67  )
    68  
    69  func convertRingHashProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
    70  	rhProto := &v3ringhashpb.RingHash{}
    71  	if err := proto.Unmarshal(rawProto, rhProto); err != nil {
    72  		return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
    73  	}
    74  	if rhProto.GetHashFunction() != v3ringhashpb.RingHash_XX_HASH {
    75  		return nil, fmt.Errorf("unsupported ring_hash hash function %v", rhProto.GetHashFunction())
    76  	}
    77  
    78  	var minSize, maxSize uint64 = defaultRingHashMinSize, defaultRingHashMaxSize
    79  	if min := rhProto.GetMinimumRingSize(); min != nil {
    80  		minSize = min.GetValue()
    81  	}
    82  	if max := rhProto.GetMaximumRingSize(); max != nil {
    83  		maxSize = max.GetValue()
    84  	}
    85  
    86  	rhCfg := &ringhash.LBConfig{
    87  		MinRingSize: minSize,
    88  		MaxRingSize: maxSize,
    89  	}
    90  
    91  	rhCfgJSON, err := json.Marshal(rhCfg)
    92  	if err != nil {
    93  		return nil, fmt.Errorf("error marshaling JSON for type %T: %v", rhCfg, err)
    94  	}
    95  	return makeBalancerConfigJSON(ringhash.Name, rhCfgJSON), nil
    96  }
    97  
    98  type pfConfig struct {
    99  	ShuffleAddressList bool `json:"shuffleAddressList"`
   100  }
   101  
   102  func convertPickFirstProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
   103  	pfProto := &v3pickfirstpb.PickFirst{}
   104  	if err := proto.Unmarshal(rawProto, pfProto); err != nil {
   105  		return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
   106  	}
   107  
   108  	pfCfg := &pfConfig{ShuffleAddressList: pfProto.GetShuffleAddressList()}
   109  	js, err := json.Marshal(pfCfg)
   110  	if err != nil {
   111  		return nil, fmt.Errorf("error marshaling JSON for type %T: %v", pfCfg, err)
   112  	}
   113  	return makeBalancerConfigJSON(grpc.PickFirstBalancerName, js), nil
   114  }
   115  
   116  func convertRoundRobinProtoToServiceConfig([]byte, int) (json.RawMessage, error) {
   117  	return makeBalancerConfigJSON(roundrobin.Name, json.RawMessage("{}")), nil
   118  }
   119  
   120  type wrrLocalityLBConfig struct {
   121  	ChildPolicy json.RawMessage `json:"childPolicy,omitempty"`
   122  }
   123  
   124  func convertWRRLocalityProtoToServiceConfig(rawProto []byte, depth int) (json.RawMessage, error) {
   125  	wrrlProto := &v3wrrlocalitypb.WrrLocality{}
   126  	if err := proto.Unmarshal(rawProto, wrrlProto); err != nil {
   127  		return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
   128  	}
   129  	epJSON, err := xdslbregistry.ConvertToServiceConfig(wrrlProto.GetEndpointPickingPolicy(), depth+1)
   130  	if err != nil {
   131  		return nil, fmt.Errorf("error converting endpoint picking policy: %v for %+v", err, wrrlProto)
   132  	}
   133  	wrrLCfg := wrrLocalityLBConfig{
   134  		ChildPolicy: epJSON,
   135  	}
   136  
   137  	lbCfgJSON, err := json.Marshal(wrrLCfg)
   138  	if err != nil {
   139  		return nil, fmt.Errorf("error marshaling JSON for type %T: %v", wrrLCfg, err)
   140  	}
   141  	return makeBalancerConfigJSON(wrrlocality.Name, lbCfgJSON), nil
   142  }
   143  
   144  func convertWeightedRoundRobinProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
   145  	cswrrProto := &v3clientsideweightedroundrobinpb.ClientSideWeightedRoundRobin{}
   146  	if err := proto.Unmarshal(rawProto, cswrrProto); err != nil {
   147  		return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
   148  	}
   149  	wrrLBCfg := &wrrLBConfig{}
   150  	// Only set fields if specified in proto. If not set, ParseConfig of the WRR
   151  	// will populate the config with defaults.
   152  	if enableOOBLoadReportCfg := cswrrProto.GetEnableOobLoadReport(); enableOOBLoadReportCfg != nil {
   153  		wrrLBCfg.EnableOOBLoadReport = enableOOBLoadReportCfg.GetValue()
   154  	}
   155  	if oobReportingPeriodCfg := cswrrProto.GetOobReportingPeriod(); oobReportingPeriodCfg != nil {
   156  		wrrLBCfg.OOBReportingPeriod = internalserviceconfig.Duration(oobReportingPeriodCfg.AsDuration())
   157  	}
   158  	if blackoutPeriodCfg := cswrrProto.GetBlackoutPeriod(); blackoutPeriodCfg != nil {
   159  		wrrLBCfg.BlackoutPeriod = internalserviceconfig.Duration(blackoutPeriodCfg.AsDuration())
   160  	}
   161  	if weightExpirationPeriodCfg := cswrrProto.GetBlackoutPeriod(); weightExpirationPeriodCfg != nil {
   162  		wrrLBCfg.WeightExpirationPeriod = internalserviceconfig.Duration(weightExpirationPeriodCfg.AsDuration())
   163  	}
   164  	if weightUpdatePeriodCfg := cswrrProto.GetWeightUpdatePeriod(); weightUpdatePeriodCfg != nil {
   165  		wrrLBCfg.WeightUpdatePeriod = internalserviceconfig.Duration(weightUpdatePeriodCfg.AsDuration())
   166  	}
   167  	if errorUtilizationPenaltyCfg := cswrrProto.GetErrorUtilizationPenalty(); errorUtilizationPenaltyCfg != nil {
   168  		wrrLBCfg.ErrorUtilizationPenalty = float64(errorUtilizationPenaltyCfg.GetValue())
   169  	}
   170  
   171  	lbCfgJSON, err := json.Marshal(wrrLBCfg)
   172  	if err != nil {
   173  		return nil, fmt.Errorf("error marshaling JSON for type %T: %v", wrrLBCfg, err)
   174  	}
   175  	return makeBalancerConfigJSON(weightedroundrobin.Name, lbCfgJSON), nil
   176  }
   177  
   178  func convertLeastRequestProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
   179  	if !envconfig.LeastRequestLB {
   180  		return nil, nil
   181  	}
   182  	lrProto := &v3leastrequestpb.LeastRequest{}
   183  	if err := proto.Unmarshal(rawProto, lrProto); err != nil {
   184  		return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
   185  	}
   186  	// "The configuration for the Least Request LB policy is the
   187  	// least_request_lb_config field. The field is optional; if not present,
   188  	// defaults will be assumed for all of its values." - A48
   189  	choiceCount := uint32(defaultLeastRequestChoiceCount)
   190  	if cc := lrProto.GetChoiceCount(); cc != nil {
   191  		choiceCount = cc.GetValue()
   192  	}
   193  	lrCfg := &leastrequest.LBConfig{ChoiceCount: choiceCount}
   194  	js, err := json.Marshal(lrCfg)
   195  	if err != nil {
   196  		return nil, fmt.Errorf("error marshaling JSON for type %T: %v", lrCfg, err)
   197  	}
   198  	return makeBalancerConfigJSON(leastrequest.Name, js), nil
   199  }
   200  
   201  func convertV1TypedStructToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
   202  	tsProto := &v1xdsudpatypepb.TypedStruct{}
   203  	if err := proto.Unmarshal(rawProto, tsProto); err != nil {
   204  		return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
   205  	}
   206  	return convertCustomPolicy(tsProto.GetTypeUrl(), tsProto.GetValue())
   207  }
   208  
   209  func convertV3TypedStructToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
   210  	tsProto := &v3xdsxdstypepb.TypedStruct{}
   211  	if err := proto.Unmarshal(rawProto, tsProto); err != nil {
   212  		return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
   213  	}
   214  	return convertCustomPolicy(tsProto.GetTypeUrl(), tsProto.GetValue())
   215  }
   216  
   217  // convertCustomPolicy attempts to prepare json configuration for a custom lb
   218  // proto, which specifies the gRPC balancer type and configuration. Returns the
   219  // converted json and an error which should cause caller to error if error
   220  // converting. If both json and error returned are nil, it means the gRPC
   221  // Balancer registry does not contain that balancer type, and the caller should
   222  // continue to the next policy.
   223  func convertCustomPolicy(typeURL string, s *structpb.Struct) (json.RawMessage, error) {
   224  	// The gRPC policy name will be the "type name" part of the value of the
   225  	// type_url field in the TypedStruct. We get this by using the part after
   226  	// the last / character. Can assume a valid type_url from the control plane.
   227  	pos := strings.LastIndex(typeURL, "/")
   228  	name := typeURL[pos+1:]
   229  
   230  	if balancer.Get(name) == nil {
   231  		return nil, nil
   232  	}
   233  
   234  	rawJSON, err := json.Marshal(s)
   235  	if err != nil {
   236  		return nil, fmt.Errorf("error converting custom lb policy %v: %v for %+v", err, typeURL, s)
   237  	}
   238  
   239  	// The Struct contained in the TypedStruct will be returned as-is as the
   240  	// configuration JSON object.
   241  	return makeBalancerConfigJSON(name, rawJSON), nil
   242  }
   243  
   244  type wrrLBConfig struct {
   245  	EnableOOBLoadReport     bool                           `json:"enableOobLoadReport,omitempty"`
   246  	OOBReportingPeriod      internalserviceconfig.Duration `json:"oobReportingPeriod,omitempty"`
   247  	BlackoutPeriod          internalserviceconfig.Duration `json:"blackoutPeriod,omitempty"`
   248  	WeightExpirationPeriod  internalserviceconfig.Duration `json:"weightExpirationPeriod,omitempty"`
   249  	WeightUpdatePeriod      internalserviceconfig.Duration `json:"weightUpdatePeriod,omitempty"`
   250  	ErrorUtilizationPenalty float64                        `json:"errorUtilizationPenalty,omitempty"`
   251  }
   252  
   253  func makeBalancerConfigJSON(name string, value json.RawMessage) []byte {
   254  	return []byte(fmt.Sprintf(`[{%q: %s}]`, name, value))
   255  }
   256  

View as plain text