...

Source file src/google.golang.org/grpc/balancer/weightedtarget/weightedtarget.go

Documentation: google.golang.org/grpc/balancer/weightedtarget

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package weightedtarget implements the weighted_target balancer.
    20  //
    21  // All APIs in this package are experimental.
    22  package weightedtarget
    23  
    24  import (
    25  	"encoding/json"
    26  	"fmt"
    27  	"time"
    28  
    29  	"google.golang.org/grpc/balancer"
    30  	"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
    31  	"google.golang.org/grpc/internal/balancergroup"
    32  	"google.golang.org/grpc/internal/grpclog"
    33  	"google.golang.org/grpc/internal/hierarchy"
    34  	"google.golang.org/grpc/internal/pretty"
    35  	"google.golang.org/grpc/internal/wrr"
    36  	"google.golang.org/grpc/resolver"
    37  	"google.golang.org/grpc/serviceconfig"
    38  )
    39  
    40  // Name is the name of the weighted_target balancer.
    41  const Name = "weighted_target_experimental"
    42  
    43  // NewRandomWRR is the WRR constructor used to pick sub-pickers from
    44  // sub-balancers. It's to be modified in tests.
    45  var NewRandomWRR = wrr.NewRandom
    46  
    47  func init() {
    48  	balancer.Register(bb{})
    49  }
    50  
    51  type bb struct{}
    52  
    53  func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    54  	b := &weightedTargetBalancer{}
    55  	b.logger = prefixLogger(b)
    56  	b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
    57  	b.stateAggregator.Start()
    58  	b.bg = balancergroup.New(balancergroup.Options{
    59  		CC:                      cc,
    60  		BuildOpts:               bOpts,
    61  		StateAggregator:         b.stateAggregator,
    62  		Logger:                  b.logger,
    63  		SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
    64  	})
    65  	b.bg.Start()
    66  	b.logger.Infof("Created")
    67  	return b
    68  }
    69  
    70  func (bb) Name() string {
    71  	return Name
    72  }
    73  
    74  func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    75  	return parseConfig(c)
    76  }
    77  
    78  type weightedTargetBalancer struct {
    79  	logger *grpclog.PrefixLogger
    80  
    81  	bg              *balancergroup.BalancerGroup
    82  	stateAggregator *weightedaggregator.Aggregator
    83  
    84  	targets map[string]Target
    85  }
    86  
    87  // UpdateClientConnState takes the new targets in balancer group,
    88  // creates/deletes sub-balancers and sends them update. addresses are split into
    89  // groups based on hierarchy path.
    90  func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
    91  	b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
    92  	newConfig, ok := s.BalancerConfig.(*LBConfig)
    93  	if !ok {
    94  		return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
    95  	}
    96  	addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
    97  
    98  	b.stateAggregator.PauseStateUpdates()
    99  	defer b.stateAggregator.ResumeStateUpdates()
   100  
   101  	// Remove sub-pickers and sub-balancers that are not in the new config.
   102  	for name := range b.targets {
   103  		if _, ok := newConfig.Targets[name]; !ok {
   104  			b.stateAggregator.Remove(name)
   105  			b.bg.Remove(name)
   106  		}
   107  	}
   108  
   109  	// For sub-balancers in the new config
   110  	// - if it's new. add to balancer group,
   111  	// - if it's old, but has a new weight, update weight in balancer group.
   112  	//
   113  	// For all sub-balancers, forward the address/balancer config update.
   114  	for name, newT := range newConfig.Targets {
   115  		oldT, ok := b.targets[name]
   116  		if !ok {
   117  			// If this is a new sub-balancer, add weights to the picker map.
   118  			b.stateAggregator.Add(name, newT.Weight)
   119  			// Then add to the balancer group.
   120  			b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
   121  			// Not trigger a state/picker update. Wait for the new sub-balancer
   122  			// to send its updates.
   123  		} else if newT.ChildPolicy.Name != oldT.ChildPolicy.Name {
   124  			// If the child policy name is different, remove from balancer group
   125  			// and re-add.
   126  			b.stateAggregator.Remove(name)
   127  			b.bg.Remove(name)
   128  			b.stateAggregator.Add(name, newT.Weight)
   129  			b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
   130  		} else if newT.Weight != oldT.Weight {
   131  			// If this is an existing sub-balancer, update weight if necessary.
   132  			b.stateAggregator.UpdateWeight(name, newT.Weight)
   133  		}
   134  
   135  		// Forwards all the update:
   136  		// - addresses are from the map after splitting with hierarchy path,
   137  		// - Top level service config and attributes are the same,
   138  		// - Balancer config comes from the targets map.
   139  		//
   140  		// TODO: handle error? How to aggregate errors and return?
   141  		_ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
   142  			ResolverState: resolver.State{
   143  				Addresses:     addressesSplit[name],
   144  				ServiceConfig: s.ResolverState.ServiceConfig,
   145  				Attributes:    s.ResolverState.Attributes,
   146  			},
   147  			BalancerConfig: newT.ChildPolicy.Config,
   148  		})
   149  	}
   150  
   151  	b.targets = newConfig.Targets
   152  
   153  	// If the targets length is zero, it means we have removed all child
   154  	// policies from the balancer group and aggregator.
   155  	// At the start of this UpdateClientConnState() operation, a call to
   156  	// b.stateAggregator.ResumeStateUpdates() is deferred. Thus, setting the
   157  	// needUpdateStateOnResume bool to true here will ensure a new picker is
   158  	// built as part of that deferred function. Since there are now no child
   159  	// policies, the aggregated connectivity state reported form the Aggregator
   160  	// will be TRANSIENT_FAILURE.
   161  	if len(b.targets) == 0 {
   162  		b.stateAggregator.NeedUpdateStateOnResume()
   163  	}
   164  
   165  	return nil
   166  }
   167  
   168  func (b *weightedTargetBalancer) ResolverError(err error) {
   169  	b.bg.ResolverError(err)
   170  }
   171  
   172  func (b *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   173  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   174  }
   175  
   176  func (b *weightedTargetBalancer) Close() {
   177  	b.stateAggregator.Stop()
   178  	b.bg.Close()
   179  }
   180  
   181  func (b *weightedTargetBalancer) ExitIdle() {
   182  	b.bg.ExitIdle()
   183  }
   184  

View as plain text