...

Source file src/google.golang.org/grpc/xds/internal/balancer/clustermanager/clustermanager.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clustermanager

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package clustermanager implements the cluster manager LB policy for xds.
    20  package clustermanager
    21  
    22  import (
    23  	"encoding/json"
    24  	"fmt"
    25  	"time"
    26  
    27  	"google.golang.org/grpc/balancer"
    28  	"google.golang.org/grpc/grpclog"
    29  	"google.golang.org/grpc/internal/balancergroup"
    30  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    31  	"google.golang.org/grpc/internal/hierarchy"
    32  	"google.golang.org/grpc/internal/pretty"
    33  	"google.golang.org/grpc/resolver"
    34  	"google.golang.org/grpc/serviceconfig"
    35  )
    36  
    37  const balancerName = "xds_cluster_manager_experimental"
    38  
    39  func init() {
    40  	balancer.Register(bb{})
    41  }
    42  
    43  type bb struct{}
    44  
    45  func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    46  	b := &bal{}
    47  	b.logger = prefixLogger(b)
    48  	b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
    49  	b.stateAggregator.start()
    50  	b.bg = balancergroup.New(balancergroup.Options{
    51  		CC:                      cc,
    52  		BuildOpts:               opts,
    53  		StateAggregator:         b.stateAggregator,
    54  		Logger:                  b.logger,
    55  		SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
    56  	})
    57  	b.bg.Start()
    58  	b.logger.Infof("Created")
    59  	return b
    60  }
    61  
    62  func (bb) Name() string {
    63  	return balancerName
    64  }
    65  
    66  func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    67  	return parseConfig(c)
    68  }
    69  
    70  type bal struct {
    71  	logger *internalgrpclog.PrefixLogger
    72  
    73  	// TODO: make this package not dependent on xds specific code. Same as for
    74  	// weighted target balancer.
    75  	bg              *balancergroup.BalancerGroup
    76  	stateAggregator *balancerStateAggregator
    77  
    78  	children map[string]childConfig
    79  }
    80  
    81  func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) {
    82  	update := false
    83  	addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
    84  
    85  	// Remove sub-pickers and sub-balancers that are not in the new cluster list.
    86  	for name := range b.children {
    87  		if _, ok := newConfig.Children[name]; !ok {
    88  			b.stateAggregator.remove(name)
    89  			b.bg.Remove(name)
    90  			update = true
    91  		}
    92  	}
    93  
    94  	// For sub-balancers in the new cluster list,
    95  	// - add to balancer group if it's new,
    96  	// - forward the address/balancer config update.
    97  	for name, newT := range newConfig.Children {
    98  		if _, ok := b.children[name]; !ok {
    99  			// If this is a new sub-balancer, add it to the picker map.
   100  			b.stateAggregator.add(name)
   101  			// Then add to the balancer group.
   102  			b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
   103  		} else {
   104  			// Already present, check for type change and if so send down a new builder.
   105  			if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name {
   106  				b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name))
   107  			}
   108  		}
   109  		// TODO: handle error? How to aggregate errors and return?
   110  		_ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
   111  			ResolverState: resolver.State{
   112  				Addresses:     addressesSplit[name],
   113  				ServiceConfig: s.ResolverState.ServiceConfig,
   114  				Attributes:    s.ResolverState.Attributes,
   115  			},
   116  			BalancerConfig: newT.ChildPolicy.Config,
   117  		})
   118  	}
   119  
   120  	b.children = newConfig.Children
   121  	if update {
   122  		b.stateAggregator.buildAndUpdate()
   123  	}
   124  }
   125  
   126  func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error {
   127  	newConfig, ok := s.BalancerConfig.(*lbConfig)
   128  	if !ok {
   129  		return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
   130  	}
   131  	b.logger.Infof("update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState)
   132  
   133  	b.stateAggregator.pauseStateUpdates()
   134  	defer b.stateAggregator.resumeStateUpdates()
   135  	b.updateChildren(s, newConfig)
   136  	return nil
   137  }
   138  
   139  func (b *bal) ResolverError(err error) {
   140  	b.bg.ResolverError(err)
   141  }
   142  
   143  func (b *bal) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   144  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   145  }
   146  
   147  func (b *bal) Close() {
   148  	b.stateAggregator.close()
   149  	b.bg.Close()
   150  	b.logger.Infof("Shutdown")
   151  }
   152  
   153  func (b *bal) ExitIdle() {
   154  	b.bg.ExitIdle()
   155  }
   156  
   157  const prefix = "[xds-cluster-manager-lb %p] "
   158  
   159  var logger = grpclog.Component("xds")
   160  
   161  func prefixLogger(p *bal) *internalgrpclog.PrefixLogger {
   162  	return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
   163  }
   164  

View as plain text