...

Source file src/google.golang.org/grpc/interop/xds/custom_lb.go

Documentation: google.golang.org/grpc/interop/xds

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package xds contains various xds interop helpers for usage in interop tests.
    20  package xds
    21  
    22  import (
    23  	"encoding/json"
    24  	"fmt"
    25  	"sync"
    26  
    27  	"google.golang.org/grpc/balancer"
    28  	"google.golang.org/grpc/balancer/roundrobin"
    29  	"google.golang.org/grpc/internal/pretty"
    30  	"google.golang.org/grpc/metadata"
    31  	"google.golang.org/grpc/serviceconfig"
    32  )
    33  
    34  func init() {
    35  	balancer.Register(rpcBehaviorBB{})
    36  }
    37  
    38  const name = "test.RpcBehaviorLoadBalancer"
    39  
    40  type rpcBehaviorBB struct{}
    41  
    42  func (rpcBehaviorBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    43  	b := &rpcBehaviorLB{
    44  		ClientConn: cc,
    45  	}
    46  	// round_robin child to complete balancer tree with a usable leaf policy and
    47  	// have RPCs actually work.
    48  	builder := balancer.Get(roundrobin.Name)
    49  	if builder == nil {
    50  		// Shouldn't happen, defensive programming. Registered from import of
    51  		// roundrobin package.
    52  		return nil
    53  	}
    54  	rr := builder.Build(b, bOpts)
    55  	if rr == nil {
    56  		// Shouldn't happen, defensive programming.
    57  		return nil
    58  	}
    59  	b.Balancer = rr
    60  	return b
    61  }
    62  
    63  func (rpcBehaviorBB) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    64  	lbCfg := &lbConfig{}
    65  	if err := json.Unmarshal(s, lbCfg); err != nil {
    66  		return nil, fmt.Errorf("rpc-behavior-lb: unable to marshal lbConfig: %s, error: %v", string(s), err)
    67  	}
    68  	return lbCfg, nil
    69  
    70  }
    71  
    72  func (rpcBehaviorBB) Name() string {
    73  	return name
    74  }
    75  
    76  type lbConfig struct {
    77  	serviceconfig.LoadBalancingConfig `json:"-"`
    78  	RPCBehavior                       string `json:"rpcBehavior,omitempty"`
    79  }
    80  
    81  // rpcBehaviorLB is a load balancer that wraps a round robin balancer and
    82  // appends the rpc-behavior metadata field to any metadata in pick results based
    83  // on what is specified in configuration.
    84  type rpcBehaviorLB struct {
    85  	// embed a ClientConn to wrap only UpdateState() operation
    86  	balancer.ClientConn
    87  	// embed a Balancer to wrap only UpdateClientConnState() operation
    88  	balancer.Balancer
    89  
    90  	mu  sync.Mutex
    91  	cfg *lbConfig
    92  }
    93  
    94  func (b *rpcBehaviorLB) UpdateClientConnState(s balancer.ClientConnState) error {
    95  	lbCfg, ok := s.BalancerConfig.(*lbConfig)
    96  	if !ok {
    97  		return fmt.Errorf("test.RpcBehaviorLoadBalancer:received config with unexpected type %T: %s", s.BalancerConfig, pretty.ToJSON(s.BalancerConfig))
    98  	}
    99  	b.mu.Lock()
   100  	b.cfg = lbCfg
   101  	b.mu.Unlock()
   102  	return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
   103  		ResolverState: s.ResolverState,
   104  	})
   105  }
   106  
   107  func (b *rpcBehaviorLB) UpdateState(state balancer.State) {
   108  	b.mu.Lock()
   109  	rpcBehavior := b.cfg.RPCBehavior
   110  	b.mu.Unlock()
   111  
   112  	b.ClientConn.UpdateState(balancer.State{
   113  		ConnectivityState: state.ConnectivityState,
   114  		Picker:            newRPCBehaviorPicker(state.Picker, rpcBehavior),
   115  	})
   116  }
   117  
   118  // rpcBehaviorPicker wraps a picker and adds the rpc-behavior metadata field
   119  // into the child pick result's metadata.
   120  type rpcBehaviorPicker struct {
   121  	childPicker balancer.Picker
   122  	rpcBehavior string
   123  }
   124  
   125  // Pick appends the rpc-behavior metadata entry to the pick result of the child.
   126  func (p *rpcBehaviorPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
   127  	pr, err := p.childPicker.Pick(info)
   128  	if err != nil {
   129  		return balancer.PickResult{}, err
   130  	}
   131  	pr.Metadata = metadata.Join(pr.Metadata, metadata.Pairs("rpc-behavior", p.rpcBehavior))
   132  	return pr, nil
   133  }
   134  
   135  func newRPCBehaviorPicker(childPicker balancer.Picker, rpcBehavior string) *rpcBehaviorPicker {
   136  	return &rpcBehaviorPicker{
   137  		childPicker: childPicker,
   138  		rpcBehavior: rpcBehavior,
   139  	}
   140  }
   141  

View as plain text