...

Source file src/google.golang.org/grpc/xds/internal/balancer/wrrlocality/balancer.go

Documentation: google.golang.org/grpc/xds/internal/balancer/wrrlocality

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package wrrlocality provides an implementation of the wrr locality LB policy,
    20  // as defined in [A52 - xDS Custom LB Policies].
    21  //
    22  // [A52 - xDS Custom LB Policies]: https://github.com/grpc/proposal/blob/master/A52-xds-custom-lb-policies.md
    23  package wrrlocality
    24  
    25  import (
    26  	"encoding/json"
    27  	"errors"
    28  	"fmt"
    29  
    30  	"google.golang.org/grpc/balancer"
    31  	"google.golang.org/grpc/balancer/weightedtarget"
    32  	"google.golang.org/grpc/internal/grpclog"
    33  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    34  	"google.golang.org/grpc/resolver"
    35  	"google.golang.org/grpc/serviceconfig"
    36  	"google.golang.org/grpc/xds/internal"
    37  )
    38  
    39  // Name is the name of wrr_locality balancer.
    40  const Name = "xds_wrr_locality_experimental"
    41  
    42  func init() {
    43  	balancer.Register(bb{})
    44  }
    45  
    46  type bb struct{}
    47  
    48  func (bb) Name() string {
    49  	return Name
    50  }
    51  
    52  // LBConfig is the config for the wrr locality balancer.
    53  type LBConfig struct {
    54  	serviceconfig.LoadBalancingConfig `json:"-"`
    55  	// ChildPolicy is the config for the child policy.
    56  	ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
    57  }
    58  
    59  // To plumb in a different child in tests.
    60  var weightedTargetName = weightedtarget.Name
    61  
    62  func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    63  	builder := balancer.Get(weightedTargetName)
    64  	if builder == nil {
    65  		// Shouldn't happen, registered through imported weighted target,
    66  		// defensive programming.
    67  		return nil
    68  	}
    69  
    70  	// Doesn't need to intercept any balancer.ClientConn operations; pass
    71  	// through by just giving cc to child balancer.
    72  	wtb := builder.Build(cc, bOpts)
    73  	if wtb == nil {
    74  		// shouldn't happen, defensive programming.
    75  		return nil
    76  	}
    77  	wtbCfgParser, ok := builder.(balancer.ConfigParser)
    78  	if !ok {
    79  		// Shouldn't happen, imported weighted target builder has this method.
    80  		return nil
    81  	}
    82  	wrrL := &wrrLocalityBalancer{
    83  		child:       wtb,
    84  		childParser: wtbCfgParser,
    85  	}
    86  
    87  	wrrL.logger = prefixLogger(wrrL)
    88  	wrrL.logger.Infof("Created")
    89  	return wrrL
    90  }
    91  
    92  func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    93  	var lbCfg *LBConfig
    94  	if err := json.Unmarshal(s, &lbCfg); err != nil {
    95  		return nil, fmt.Errorf("xds_wrr_locality: invalid LBConfig: %s, error: %v", string(s), err)
    96  	}
    97  	if lbCfg == nil || lbCfg.ChildPolicy == nil {
    98  		return nil, errors.New("xds_wrr_locality: invalid LBConfig: child policy field must be set")
    99  	}
   100  	return lbCfg, nil
   101  }
   102  
   103  type attributeKey struct{}
   104  
   105  // Equal allows the values to be compared by Attributes.Equal.
   106  func (a AddrInfo) Equal(o any) bool {
   107  	oa, ok := o.(AddrInfo)
   108  	return ok && oa.LocalityWeight == a.LocalityWeight
   109  }
   110  
   111  // AddrInfo is the locality weight of the locality an address is a part of.
   112  type AddrInfo struct {
   113  	LocalityWeight uint32
   114  }
   115  
   116  // SetAddrInfo returns a copy of addr in which the BalancerAttributes field is
   117  // updated with AddrInfo.
   118  func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
   119  	addr.BalancerAttributes = addr.BalancerAttributes.WithValue(attributeKey{}, addrInfo)
   120  	return addr
   121  }
   122  
   123  func (a AddrInfo) String() string {
   124  	return fmt.Sprintf("Locality Weight: %d", a.LocalityWeight)
   125  }
   126  
   127  // getAddrInfo returns the AddrInfo stored in the BalancerAttributes field of
   128  // addr. Returns false if no AddrInfo found.
   129  func getAddrInfo(addr resolver.Address) (AddrInfo, bool) {
   130  	v := addr.BalancerAttributes.Value(attributeKey{})
   131  	ai, ok := v.(AddrInfo)
   132  	return ai, ok
   133  }
   134  
   135  // wrrLocalityBalancer wraps a weighted target balancer, and builds
   136  // configuration for the weighted target once it receives configuration
   137  // specifying the weighted target child balancer and locality weight
   138  // information.
   139  type wrrLocalityBalancer struct {
   140  	// child will be a weighted target balancer, and will be built it at
   141  	// wrrLocalityBalancer build time. Other than preparing configuration, other
   142  	// balancer operations are simply pass through.
   143  	child balancer.Balancer
   144  
   145  	childParser balancer.ConfigParser
   146  
   147  	logger *grpclog.PrefixLogger
   148  }
   149  
   150  func (b *wrrLocalityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
   151  	lbCfg, ok := s.BalancerConfig.(*LBConfig)
   152  	if !ok {
   153  		b.logger.Errorf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
   154  		return balancer.ErrBadResolverState
   155  	}
   156  
   157  	weightedTargets := make(map[string]weightedtarget.Target)
   158  	for _, addr := range s.ResolverState.Addresses {
   159  		// This get of LocalityID could potentially return a zero value. This
   160  		// shouldn't happen though (this attribute that is set actually gets
   161  		// used to build localities in the first place), and thus don't error
   162  		// out, and just build a weighted target with undefined behavior.
   163  		locality, err := internal.GetLocalityID(addr).ToString()
   164  		if err != nil {
   165  			// Should never happen.
   166  			logger.Errorf("Failed to marshal LocalityID: %v, skipping this locality in weighted target")
   167  		}
   168  		ai, ok := getAddrInfo(addr)
   169  		if !ok {
   170  			return fmt.Errorf("xds_wrr_locality: missing locality weight information in address %q", addr)
   171  		}
   172  		weightedTargets[locality] = weightedtarget.Target{Weight: ai.LocalityWeight, ChildPolicy: lbCfg.ChildPolicy}
   173  	}
   174  	wtCfg := &weightedtarget.LBConfig{Targets: weightedTargets}
   175  	wtCfgJSON, err := json.Marshal(wtCfg)
   176  	if err != nil {
   177  		// Shouldn't happen.
   178  		return fmt.Errorf("xds_wrr_locality: error marshalling prepared config: %v", wtCfg)
   179  	}
   180  	var sc serviceconfig.LoadBalancingConfig
   181  	if sc, err = b.childParser.ParseConfig(wtCfgJSON); err != nil {
   182  		return fmt.Errorf("xds_wrr_locality: config generated %v is invalid: %v", wtCfgJSON, err)
   183  	}
   184  
   185  	return b.child.UpdateClientConnState(balancer.ClientConnState{
   186  		ResolverState:  s.ResolverState,
   187  		BalancerConfig: sc,
   188  	})
   189  }
   190  
   191  func (b *wrrLocalityBalancer) ResolverError(err error) {
   192  	b.child.ResolverError(err)
   193  }
   194  
   195  func (b *wrrLocalityBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   196  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   197  }
   198  
   199  func (b *wrrLocalityBalancer) Close() {
   200  	b.child.Close()
   201  }
   202  

View as plain text