...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterimpl/clusterimpl.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterimpl

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package clusterimpl implements the xds_cluster_impl balancing policy. It
    20  // handles the cluster features (e.g. circuit_breaking, RPC dropping).
    21  //
    22  // Note that it doesn't handle name resolution, which is done by policy
    23  // xds_cluster_resolver.
    24  package clusterimpl
    25  
    26  import (
    27  	"encoding/json"
    28  	"fmt"
    29  	"sync"
    30  	"sync/atomic"
    31  
    32  	"google.golang.org/grpc/balancer"
    33  	"google.golang.org/grpc/connectivity"
    34  	"google.golang.org/grpc/internal/balancer/gracefulswitch"
    35  	"google.golang.org/grpc/internal/buffer"
    36  	"google.golang.org/grpc/internal/grpclog"
    37  	"google.golang.org/grpc/internal/grpcsync"
    38  	"google.golang.org/grpc/internal/pretty"
    39  	"google.golang.org/grpc/internal/xds"
    40  	"google.golang.org/grpc/internal/xds/bootstrap"
    41  	"google.golang.org/grpc/resolver"
    42  	"google.golang.org/grpc/serviceconfig"
    43  	xdsinternal "google.golang.org/grpc/xds/internal"
    44  	"google.golang.org/grpc/xds/internal/balancer/loadstore"
    45  	"google.golang.org/grpc/xds/internal/xdsclient"
    46  	"google.golang.org/grpc/xds/internal/xdsclient/load"
    47  )
    48  
    49  const (
    50  	// Name is the name of the cluster_impl balancer.
    51  	Name                   = "xds_cluster_impl_experimental"
    52  	defaultRequestCountMax = 1024
    53  )
    54  
    55  func init() {
    56  	balancer.Register(bb{})
    57  }
    58  
    59  type bb struct{}
    60  
    61  func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    62  	b := &clusterImplBalancer{
    63  		ClientConn:      cc,
    64  		bOpts:           bOpts,
    65  		closed:          grpcsync.NewEvent(),
    66  		done:            grpcsync.NewEvent(),
    67  		loadWrapper:     loadstore.NewWrapper(),
    68  		pickerUpdateCh:  buffer.NewUnbounded(),
    69  		requestCountMax: defaultRequestCountMax,
    70  	}
    71  	b.logger = prefixLogger(b)
    72  	b.child = gracefulswitch.NewBalancer(b, bOpts)
    73  	go b.run()
    74  	b.logger.Infof("Created")
    75  	return b
    76  }
    77  
    78  func (bb) Name() string {
    79  	return Name
    80  }
    81  
    82  func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    83  	return parseConfig(c)
    84  }
    85  
    86  type clusterImplBalancer struct {
    87  	balancer.ClientConn
    88  
    89  	// mu guarantees mutual exclusion between Close() and handling of picker
    90  	// update to the parent ClientConn in run(). It's to make sure that the
    91  	// run() goroutine doesn't send picker update to parent after the balancer
    92  	// is closed.
    93  	//
    94  	// It's only used by the run() goroutine, but not the other exported
    95  	// functions. Because the exported functions are guaranteed to be
    96  	// synchronized with Close().
    97  	mu     sync.Mutex
    98  	closed *grpcsync.Event
    99  	done   *grpcsync.Event
   100  
   101  	bOpts     balancer.BuildOptions
   102  	logger    *grpclog.PrefixLogger
   103  	xdsClient xdsclient.XDSClient
   104  
   105  	config           *LBConfig
   106  	child            *gracefulswitch.Balancer
   107  	cancelLoadReport func()
   108  	edsServiceName   string
   109  	lrsServer        *bootstrap.ServerConfig
   110  	loadWrapper      *loadstore.Wrapper
   111  
   112  	clusterNameMu sync.Mutex
   113  	clusterName   string
   114  
   115  	// childState/drops/requestCounter keeps the state used by the most recently
   116  	// generated picker. All fields can only be accessed in run(). And run() is
   117  	// the only goroutine that sends picker to the parent ClientConn. All
   118  	// requests to update picker need to be sent to pickerUpdateCh.
   119  	childState            balancer.State
   120  	dropCategories        []DropConfig // The categories for drops.
   121  	drops                 []*dropper
   122  	requestCounterCluster string // The cluster name for the request counter.
   123  	requestCounterService string // The service name for the request counter.
   124  	requestCounter        *xdsclient.ClusterRequestsCounter
   125  	requestCountMax       uint32
   126  	telemetryLabels       map[string]string
   127  	pickerUpdateCh        *buffer.Unbounded
   128  }
   129  
   130  // updateLoadStore checks the config for load store, and decides whether it
   131  // needs to restart the load reporting stream.
   132  func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
   133  	var updateLoadClusterAndService bool
   134  
   135  	// ClusterName is different, restart. ClusterName is from ClusterName and
   136  	// EDSServiceName.
   137  	clusterName := b.getClusterName()
   138  	if clusterName != newConfig.Cluster {
   139  		updateLoadClusterAndService = true
   140  		b.setClusterName(newConfig.Cluster)
   141  		clusterName = newConfig.Cluster
   142  	}
   143  	if b.edsServiceName != newConfig.EDSServiceName {
   144  		updateLoadClusterAndService = true
   145  		b.edsServiceName = newConfig.EDSServiceName
   146  	}
   147  	if updateLoadClusterAndService {
   148  		// This updates the clusterName and serviceName that will be reported
   149  		// for the loads. The update here is too early, the perfect timing is
   150  		// when the picker is updated with the new connection. But from this
   151  		// balancer's point of view, it's impossible to tell.
   152  		//
   153  		// On the other hand, this will almost never happen. Each LRS policy
   154  		// shouldn't get updated config. The parent should do a graceful switch
   155  		// when the clusterName or serviceName is changed.
   156  		b.loadWrapper.UpdateClusterAndService(clusterName, b.edsServiceName)
   157  	}
   158  
   159  	var (
   160  		stopOldLoadReport  bool
   161  		startNewLoadReport bool
   162  	)
   163  
   164  	// Check if it's necessary to restart load report.
   165  	if b.lrsServer == nil {
   166  		if newConfig.LoadReportingServer != nil {
   167  			// Old is nil, new is not nil, start new LRS.
   168  			b.lrsServer = newConfig.LoadReportingServer
   169  			startNewLoadReport = true
   170  		}
   171  		// Old is nil, new is nil, do nothing.
   172  	} else if newConfig.LoadReportingServer == nil {
   173  		// Old is not nil, new is nil, stop old, don't start new.
   174  		b.lrsServer = newConfig.LoadReportingServer
   175  		stopOldLoadReport = true
   176  	} else {
   177  		// Old is not nil, new is not nil, compare string values, if
   178  		// different, stop old and start new.
   179  		if !b.lrsServer.Equal(newConfig.LoadReportingServer) {
   180  			b.lrsServer = newConfig.LoadReportingServer
   181  			stopOldLoadReport = true
   182  			startNewLoadReport = true
   183  		}
   184  	}
   185  
   186  	if stopOldLoadReport {
   187  		if b.cancelLoadReport != nil {
   188  			b.cancelLoadReport()
   189  			b.cancelLoadReport = nil
   190  			if !startNewLoadReport {
   191  				// If a new LRS stream will be started later, no need to update
   192  				// it to nil here.
   193  				b.loadWrapper.UpdateLoadStore(nil)
   194  			}
   195  		}
   196  	}
   197  	if startNewLoadReport {
   198  		var loadStore *load.Store
   199  		if b.xdsClient != nil {
   200  			loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
   201  		}
   202  		b.loadWrapper.UpdateLoadStore(loadStore)
   203  	}
   204  
   205  	return nil
   206  }
   207  
   208  func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
   209  	if b.closed.HasFired() {
   210  		b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
   211  		return nil
   212  	}
   213  
   214  	if b.logger.V(2) {
   215  		b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig))
   216  	}
   217  	newConfig, ok := s.BalancerConfig.(*LBConfig)
   218  	if !ok {
   219  		return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
   220  	}
   221  
   222  	// Need to check for potential errors at the beginning of this function, so
   223  	// that on errors, we reject the whole config, instead of applying part of
   224  	// it.
   225  	bb := balancer.Get(newConfig.ChildPolicy.Name)
   226  	if bb == nil {
   227  		return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
   228  	}
   229  
   230  	if b.xdsClient == nil {
   231  		c := xdsclient.FromResolverState(s.ResolverState)
   232  		if c == nil {
   233  			return balancer.ErrBadResolverState
   234  		}
   235  		b.xdsClient = c
   236  	}
   237  
   238  	// Update load reporting config. This needs to be done before updating the
   239  	// child policy because we need the loadStore from the updated client to be
   240  	// passed to the ccWrapper, so that the next picker from the child policy
   241  	// will pick up the new loadStore.
   242  	if err := b.updateLoadStore(newConfig); err != nil {
   243  		return err
   244  	}
   245  
   246  	if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
   247  		if err := b.child.SwitchTo(bb); err != nil {
   248  			return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err)
   249  		}
   250  	}
   251  	b.config = newConfig
   252  
   253  	// Notify run() of this new config, in case drop and request counter need
   254  	// update (which means a new picker needs to be generated).
   255  	b.pickerUpdateCh.Put(newConfig)
   256  
   257  	// Addresses and sub-balancer config are sent to sub-balancer.
   258  	return b.child.UpdateClientConnState(balancer.ClientConnState{
   259  		ResolverState:  s.ResolverState,
   260  		BalancerConfig: b.config.ChildPolicy.Config,
   261  	})
   262  }
   263  
   264  func (b *clusterImplBalancer) ResolverError(err error) {
   265  	if b.closed.HasFired() {
   266  		b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
   267  		return
   268  	}
   269  	b.child.ResolverError(err)
   270  }
   271  
   272  func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
   273  	if b.closed.HasFired() {
   274  		b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
   275  		return
   276  	}
   277  
   278  	// Trigger re-resolution when a SubConn turns transient failure. This is
   279  	// necessary for the LogicalDNS in cluster_resolver policy to re-resolve.
   280  	//
   281  	// Note that this happens not only for the addresses from DNS, but also for
   282  	// EDS (cluster_impl doesn't know if it's DNS or EDS, only the parent
   283  	// knows). The parent priority policy is configured to ignore re-resolution
   284  	// signal from the EDS children.
   285  	if s.ConnectivityState == connectivity.TransientFailure {
   286  		b.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
   287  	}
   288  
   289  	if cb != nil {
   290  		cb(s)
   291  	}
   292  }
   293  
   294  func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
   295  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, s)
   296  }
   297  
   298  func (b *clusterImplBalancer) Close() {
   299  	b.mu.Lock()
   300  	b.closed.Fire()
   301  	b.mu.Unlock()
   302  
   303  	b.child.Close()
   304  	b.childState = balancer.State{}
   305  	b.pickerUpdateCh.Close()
   306  	<-b.done.Done()
   307  	b.logger.Infof("Shutdown")
   308  }
   309  
   310  func (b *clusterImplBalancer) ExitIdle() {
   311  	b.child.ExitIdle()
   312  }
   313  
   314  // Override methods to accept updates from the child LB.
   315  
   316  func (b *clusterImplBalancer) UpdateState(state balancer.State) {
   317  	// Instead of updating parent ClientConn inline, send state to run().
   318  	b.pickerUpdateCh.Put(state)
   319  }
   320  
   321  func (b *clusterImplBalancer) setClusterName(n string) {
   322  	b.clusterNameMu.Lock()
   323  	defer b.clusterNameMu.Unlock()
   324  	b.clusterName = n
   325  }
   326  
   327  func (b *clusterImplBalancer) getClusterName() string {
   328  	b.clusterNameMu.Lock()
   329  	defer b.clusterNameMu.Unlock()
   330  	return b.clusterName
   331  }
   332  
   333  // scWrapper is a wrapper of SubConn with locality ID. The locality ID can be
   334  // retrieved from the addresses when creating SubConn.
   335  //
   336  // All SubConns passed to the child policies are wrapped in this, so that the
   337  // picker can get the localityID from the picked SubConn, and do load reporting.
   338  //
   339  // After wrapping, all SubConns to and from the parent ClientConn (e.g. for
   340  // SubConn state update, update/remove SubConn) must be the original SubConns.
   341  // All SubConns to and from the child policy (NewSubConn, forwarding SubConn
   342  // state update) must be the wrapper. The balancer keeps a map from the original
   343  // SubConn to the wrapper for this purpose.
   344  type scWrapper struct {
   345  	balancer.SubConn
   346  	// locality needs to be atomic because it can be updated while being read by
   347  	// the picker.
   348  	locality atomic.Value // type xdsinternal.LocalityID
   349  }
   350  
   351  func (scw *scWrapper) updateLocalityID(lID xdsinternal.LocalityID) {
   352  	scw.locality.Store(lID)
   353  }
   354  
   355  func (scw *scWrapper) localityID() xdsinternal.LocalityID {
   356  	lID, _ := scw.locality.Load().(xdsinternal.LocalityID)
   357  	return lID
   358  }
   359  
   360  func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
   361  	clusterName := b.getClusterName()
   362  	newAddrs := make([]resolver.Address, len(addrs))
   363  	var lID xdsinternal.LocalityID
   364  	for i, addr := range addrs {
   365  		newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName)
   366  		lID = xdsinternal.GetLocalityID(newAddrs[i])
   367  	}
   368  	var sc balancer.SubConn
   369  	oldListener := opts.StateListener
   370  	opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) }
   371  	sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
   372  	if err != nil {
   373  		return nil, err
   374  	}
   375  	// Wrap this SubConn in a wrapper, and add it to the map.
   376  	ret := &scWrapper{SubConn: sc}
   377  	ret.updateLocalityID(lID)
   378  	return ret, nil
   379  }
   380  
   381  func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) {
   382  	b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
   383  }
   384  
   385  func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
   386  	clusterName := b.getClusterName()
   387  	newAddrs := make([]resolver.Address, len(addrs))
   388  	var lID xdsinternal.LocalityID
   389  	for i, addr := range addrs {
   390  		newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName)
   391  		lID = xdsinternal.GetLocalityID(newAddrs[i])
   392  	}
   393  	if scw, ok := sc.(*scWrapper); ok {
   394  		scw.updateLocalityID(lID)
   395  		// Need to get the original SubConn from the wrapper before calling
   396  		// parent ClientConn.
   397  		sc = scw.SubConn
   398  	}
   399  	b.ClientConn.UpdateAddresses(sc, newAddrs)
   400  }
   401  
   402  type dropConfigs struct {
   403  	drops           []*dropper
   404  	requestCounter  *xdsclient.ClusterRequestsCounter
   405  	requestCountMax uint32
   406  }
   407  
   408  // handleDropAndRequestCount compares drop and request counter in newConfig with
   409  // the one currently used by picker. It returns a new dropConfigs if a new
   410  // picker needs to be generated, otherwise it returns nil.
   411  func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dropConfigs {
   412  	// Compare new drop config. And update picker if it's changed.
   413  	var updatePicker bool
   414  	if !equalDropCategories(b.dropCategories, newConfig.DropCategories) {
   415  		b.dropCategories = newConfig.DropCategories
   416  		b.drops = make([]*dropper, 0, len(newConfig.DropCategories))
   417  		for _, c := range newConfig.DropCategories {
   418  			b.drops = append(b.drops, newDropper(c))
   419  		}
   420  		updatePicker = true
   421  	}
   422  
   423  	// Compare cluster name. And update picker if it's changed, because circuit
   424  	// breaking's stream counter will be different.
   425  	if b.requestCounterCluster != newConfig.Cluster || b.requestCounterService != newConfig.EDSServiceName {
   426  		b.requestCounterCluster = newConfig.Cluster
   427  		b.requestCounterService = newConfig.EDSServiceName
   428  		b.requestCounter = xdsclient.GetClusterRequestsCounter(newConfig.Cluster, newConfig.EDSServiceName)
   429  		updatePicker = true
   430  	}
   431  	// Compare upper bound of stream count. And update picker if it's changed.
   432  	// This is also for circuit breaking.
   433  	var newRequestCountMax uint32 = 1024
   434  	if newConfig.MaxConcurrentRequests != nil {
   435  		newRequestCountMax = *newConfig.MaxConcurrentRequests
   436  	}
   437  	if b.requestCountMax != newRequestCountMax {
   438  		b.requestCountMax = newRequestCountMax
   439  		updatePicker = true
   440  	}
   441  
   442  	if !updatePicker {
   443  		return nil
   444  	}
   445  	return &dropConfigs{
   446  		drops:           b.drops,
   447  		requestCounter:  b.requestCounter,
   448  		requestCountMax: b.requestCountMax,
   449  	}
   450  }
   451  
   452  func (b *clusterImplBalancer) run() {
   453  	defer b.done.Fire()
   454  	for {
   455  		select {
   456  		case update, ok := <-b.pickerUpdateCh.Get():
   457  			if !ok {
   458  				return
   459  			}
   460  			b.pickerUpdateCh.Load()
   461  			b.mu.Lock()
   462  			if b.closed.HasFired() {
   463  				b.mu.Unlock()
   464  				return
   465  			}
   466  			switch u := update.(type) {
   467  			case balancer.State:
   468  				b.childState = u
   469  				b.ClientConn.UpdateState(balancer.State{
   470  					ConnectivityState: b.childState.ConnectivityState,
   471  					Picker: b.newPicker(&dropConfigs{
   472  						drops:           b.drops,
   473  						requestCounter:  b.requestCounter,
   474  						requestCountMax: b.requestCountMax,
   475  					}),
   476  				})
   477  			case *LBConfig:
   478  				b.telemetryLabels = u.TelemetryLabels
   479  				dc := b.handleDropAndRequestCount(u)
   480  				if dc != nil && b.childState.Picker != nil {
   481  					b.ClientConn.UpdateState(balancer.State{
   482  						ConnectivityState: b.childState.ConnectivityState,
   483  						Picker:            b.newPicker(dc),
   484  					})
   485  				}
   486  			}
   487  			b.mu.Unlock()
   488  		case <-b.closed.Done():
   489  			if b.cancelLoadReport != nil {
   490  				b.cancelLoadReport()
   491  				b.cancelLoadReport = nil
   492  			}
   493  			return
   494  		}
   495  	}
   496  }
   497  

View as plain text