...

Source file src/google.golang.org/grpc/balancer/grpclb/grpclb.go

Documentation: google.golang.org/grpc/balancer/grpclb

     1  /*
     2   *
     3   * Copyright 2016 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package grpclb defines a grpclb balancer.
    20  //
    21  // To install grpclb balancer, import this package as:
    22  //
    23  //	import _ "google.golang.org/grpc/balancer/grpclb"
    24  package grpclb
    25  
    26  import (
    27  	"context"
    28  	"errors"
    29  	"fmt"
    30  	"sync"
    31  	"time"
    32  
    33  	"google.golang.org/grpc"
    34  	"google.golang.org/grpc/balancer"
    35  	"google.golang.org/grpc/balancer/base"
    36  	grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
    37  	"google.golang.org/grpc/connectivity"
    38  	"google.golang.org/grpc/credentials"
    39  	"google.golang.org/grpc/grpclog"
    40  	"google.golang.org/grpc/internal"
    41  	"google.golang.org/grpc/internal/backoff"
    42  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    43  	"google.golang.org/grpc/internal/pretty"
    44  	"google.golang.org/grpc/internal/resolver/dns"
    45  	"google.golang.org/grpc/resolver"
    46  	"google.golang.org/grpc/resolver/manual"
    47  	"google.golang.org/protobuf/types/known/durationpb"
    48  
    49  	lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
    50  )
    51  
    52  const (
    53  	lbTokenKey             = "lb-token"
    54  	defaultFallbackTimeout = 10 * time.Second
    55  	grpclbName             = "grpclb"
    56  )
    57  
    58  var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
    59  var logger = grpclog.Component("grpclb")
    60  
    61  func convertDuration(d *durationpb.Duration) time.Duration {
    62  	if d == nil {
    63  		return 0
    64  	}
    65  	return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
    66  }
    67  
    68  // Client API for LoadBalancer service.
    69  // Mostly copied from generated pb.go file.
    70  // To avoid circular dependency.
    71  type loadBalancerClient struct {
    72  	cc *grpc.ClientConn
    73  }
    74  
    75  func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
    76  	desc := &grpc.StreamDesc{
    77  		StreamName:    "BalanceLoad",
    78  		ServerStreams: true,
    79  		ClientStreams: true,
    80  	}
    81  	stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
    82  	if err != nil {
    83  		return nil, err
    84  	}
    85  	x := &balanceLoadClientStream{stream}
    86  	return x, nil
    87  }
    88  
    89  type balanceLoadClientStream struct {
    90  	grpc.ClientStream
    91  }
    92  
    93  func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
    94  	return x.ClientStream.SendMsg(m)
    95  }
    96  
    97  func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
    98  	m := new(lbpb.LoadBalanceResponse)
    99  	if err := x.ClientStream.RecvMsg(m); err != nil {
   100  		return nil, err
   101  	}
   102  	return m, nil
   103  }
   104  
   105  func init() {
   106  	balancer.Register(newLBBuilder())
   107  	dns.EnableSRVLookups = true
   108  }
   109  
   110  // newLBBuilder creates a builder for grpclb.
   111  func newLBBuilder() balancer.Builder {
   112  	return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
   113  }
   114  
   115  // newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
   116  // fallbackTimeout. If no response is received from the remote balancer within
   117  // fallbackTimeout, the backend addresses from the resolved address list will be
   118  // used.
   119  //
   120  // Only call this function when a non-default fallback timeout is needed.
   121  func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
   122  	return &lbBuilder{
   123  		fallbackTimeout: fallbackTimeout,
   124  	}
   125  }
   126  
   127  type lbBuilder struct {
   128  	fallbackTimeout time.Duration
   129  }
   130  
   131  func (b *lbBuilder) Name() string {
   132  	return grpclbName
   133  }
   134  
   135  func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
   136  	// This generates a manual resolver builder with a fixed scheme. This
   137  	// scheme will be used to dial to remote LB, so we can send filtered
   138  	// address updates to remote LB ClientConn using this manual resolver.
   139  	mr := manual.NewBuilderWithScheme("grpclb-internal")
   140  	// ResolveNow() on this manual resolver is forwarded to the parent
   141  	// ClientConn, so when grpclb client loses contact with the remote balancer,
   142  	// the parent ClientConn's resolver will re-resolve.
   143  	mr.ResolveNowCallback = cc.ResolveNow
   144  
   145  	lb := &lbBalancer{
   146  		cc:              newLBCacheClientConn(cc),
   147  		dialTarget:      opt.Target.Endpoint(),
   148  		target:          opt.Target.Endpoint(),
   149  		opt:             opt,
   150  		fallbackTimeout: b.fallbackTimeout,
   151  		doneCh:          make(chan struct{}),
   152  
   153  		manualResolver: mr,
   154  		subConns:       make(map[resolver.Address]balancer.SubConn),
   155  		scStates:       make(map[balancer.SubConn]connectivity.State),
   156  		picker:         base.NewErrPicker(balancer.ErrNoSubConnAvailable),
   157  		clientStats:    newRPCStats(),
   158  		backoff:        backoff.DefaultExponential, // TODO: make backoff configurable.
   159  	}
   160  	lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[grpclb %p] ", lb))
   161  
   162  	var err error
   163  	if opt.CredsBundle != nil {
   164  		lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
   165  		if err != nil {
   166  			lb.logger.Warningf("Failed to create credentials used for connecting to grpclb: %v", err)
   167  		}
   168  		lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
   169  		if err != nil {
   170  			lb.logger.Warningf("Failed to create credentials used for connecting to backends returned by grpclb: %v", err)
   171  		}
   172  	}
   173  
   174  	return lb
   175  }
   176  
   177  type lbBalancer struct {
   178  	cc         *lbCacheClientConn
   179  	dialTarget string // user's dial target
   180  	target     string // same as dialTarget unless overridden in service config
   181  	opt        balancer.BuildOptions
   182  	logger     *internalgrpclog.PrefixLogger
   183  
   184  	usePickFirst bool
   185  
   186  	// grpclbClientConnCreds is the creds bundle to be used to connect to grpclb
   187  	// servers. If it's nil, use the TransportCredentials from BuildOptions
   188  	// instead.
   189  	grpclbClientConnCreds credentials.Bundle
   190  	// grpclbBackendCreds is the creds bundle to be used for addresses that are
   191  	// returned by grpclb server. If it's nil, don't set anything when creating
   192  	// SubConns.
   193  	grpclbBackendCreds credentials.Bundle
   194  
   195  	fallbackTimeout time.Duration
   196  	doneCh          chan struct{}
   197  
   198  	// manualResolver is used in the remote LB ClientConn inside grpclb. When
   199  	// resolved address updates are received by grpclb, filtered updates will be
   200  	// send to remote LB ClientConn through this resolver.
   201  	manualResolver *manual.Resolver
   202  	// The ClientConn to talk to the remote balancer.
   203  	ccRemoteLB *remoteBalancerCCWrapper
   204  	// backoff for calling remote balancer.
   205  	backoff backoff.Strategy
   206  
   207  	// Support client side load reporting. Each picker gets a reference to this,
   208  	// and will update its content.
   209  	clientStats *rpcStats
   210  
   211  	mu sync.Mutex // guards everything following.
   212  	// The full server list including drops, used to check if the newly received
   213  	// serverList contains anything new. Each generate picker will also have
   214  	// reference to this list to do the first layer pick.
   215  	fullServerList []*lbpb.Server
   216  	// Backend addresses. It's kept so the addresses are available when
   217  	// switching between round_robin and pickfirst.
   218  	backendAddrs []resolver.Address
   219  	// All backends addresses, with metadata set to nil. This list contains all
   220  	// backend addresses in the same order and with the same duplicates as in
   221  	// serverlist. When generating picker, a SubConn slice with the same order
   222  	// but with only READY SCs will be gerenated.
   223  	backendAddrsWithoutMetadata []resolver.Address
   224  	// Roundrobin functionalities.
   225  	state    connectivity.State
   226  	subConns map[resolver.Address]balancer.SubConn   // Used to new/shutdown SubConn.
   227  	scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
   228  	picker   balancer.Picker
   229  	// Support fallback to resolved backend addresses if there's no response
   230  	// from remote balancer within fallbackTimeout.
   231  	remoteBalancerConnected bool
   232  	serverListReceived      bool
   233  	inFallback              bool
   234  	// resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
   235  	// when resolved address updates are received, and read in the goroutine
   236  	// handling fallback.
   237  	resolvedBackendAddrs []resolver.Address
   238  	connErr              error // the last connection error
   239  }
   240  
   241  // regeneratePicker takes a snapshot of the balancer, and generates a picker from
   242  // it. The picker
   243  //   - always returns ErrTransientFailure if the balancer is in TransientFailure,
   244  //   - does two layer roundrobin pick otherwise.
   245  //
   246  // Caller must hold lb.mu.
   247  func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
   248  	if lb.state == connectivity.TransientFailure {
   249  		lb.picker = base.NewErrPicker(fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr))
   250  		return
   251  	}
   252  
   253  	if lb.state == connectivity.Connecting {
   254  		lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
   255  		return
   256  	}
   257  
   258  	var readySCs []balancer.SubConn
   259  	if lb.usePickFirst {
   260  		for _, sc := range lb.subConns {
   261  			readySCs = append(readySCs, sc)
   262  			break
   263  		}
   264  	} else {
   265  		for _, a := range lb.backendAddrsWithoutMetadata {
   266  			if sc, ok := lb.subConns[a]; ok {
   267  				if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
   268  					readySCs = append(readySCs, sc)
   269  				}
   270  			}
   271  		}
   272  	}
   273  
   274  	if len(readySCs) <= 0 {
   275  		// If there's no ready SubConns, always re-pick. This is to avoid drops
   276  		// unless at least one SubConn is ready. Otherwise we may drop more
   277  		// often than want because of drops + re-picks(which become re-drops).
   278  		//
   279  		// This doesn't seem to be necessary after the connecting check above.
   280  		// Kept for safety.
   281  		lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
   282  		return
   283  	}
   284  	if lb.inFallback {
   285  		lb.picker = newRRPicker(readySCs)
   286  		return
   287  	}
   288  	if resetDrop {
   289  		lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
   290  		return
   291  	}
   292  	prevLBPicker, ok := lb.picker.(*lbPicker)
   293  	if !ok {
   294  		lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
   295  		return
   296  	}
   297  	prevLBPicker.updateReadySCs(readySCs)
   298  }
   299  
   300  // aggregateSubConnStats calculate the aggregated state of SubConns in
   301  // lb.SubConns. These SubConns are subconns in use (when switching between
   302  // fallback and grpclb). lb.scState contains states for all SubConns, including
   303  // those in cache (SubConns are cached for 10 seconds after shutdown).
   304  //
   305  //	The aggregated state is:
   306  //	- If at least one SubConn in Ready, the aggregated state is Ready;
   307  //	- Else if at least one SubConn in Connecting or IDLE, the aggregated state is Connecting;
   308  //	  - It's OK to consider IDLE as Connecting. SubConns never stay in IDLE,
   309  //	    they start to connect immediately. But there's a race between the overall
   310  //	    state is reported, and when the new SubConn state arrives. And SubConns
   311  //	    never go back to IDLE.
   312  //	- Else the aggregated state is TransientFailure.
   313  func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
   314  	var numConnecting uint64
   315  
   316  	for _, sc := range lb.subConns {
   317  		if state, ok := lb.scStates[sc]; ok {
   318  			switch state {
   319  			case connectivity.Ready:
   320  				return connectivity.Ready
   321  			case connectivity.Connecting, connectivity.Idle:
   322  				numConnecting++
   323  			}
   324  		}
   325  	}
   326  	if numConnecting > 0 {
   327  		return connectivity.Connecting
   328  	}
   329  	return connectivity.TransientFailure
   330  }
   331  
   332  // UpdateSubConnState is unused; NewSubConn's options always specifies
   333  // updateSubConnState as the listener.
   334  func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
   335  	lb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
   336  }
   337  
   338  func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
   339  	s := scs.ConnectivityState
   340  	if lb.logger.V(2) {
   341  		lb.logger.Infof("SubConn state change: %p, %v", sc, s)
   342  	}
   343  	lb.mu.Lock()
   344  	defer lb.mu.Unlock()
   345  
   346  	oldS, ok := lb.scStates[sc]
   347  	if !ok {
   348  		if lb.logger.V(2) {
   349  			lb.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
   350  		}
   351  		return
   352  	}
   353  	lb.scStates[sc] = s
   354  	switch s {
   355  	case connectivity.Idle:
   356  		sc.Connect()
   357  	case connectivity.Shutdown:
   358  		// When an address was removed by resolver, b called Shutdown but kept
   359  		// the sc's state in scStates. Remove state for this sc here.
   360  		delete(lb.scStates, sc)
   361  	case connectivity.TransientFailure:
   362  		lb.connErr = scs.ConnectionError
   363  	}
   364  	// Force regenerate picker if
   365  	//  - this sc became ready from not-ready
   366  	//  - this sc became not-ready from ready
   367  	lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
   368  
   369  	// Enter fallback when the aggregated state is not Ready and the connection
   370  	// to remote balancer is lost.
   371  	if lb.state != connectivity.Ready {
   372  		if !lb.inFallback && !lb.remoteBalancerConnected {
   373  			// Enter fallback.
   374  			lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
   375  		}
   376  	}
   377  }
   378  
   379  // updateStateAndPicker re-calculate the aggregated state, and regenerate picker
   380  // if overall state is changed.
   381  //
   382  // If forceRegeneratePicker is true, picker will be regenerated.
   383  func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
   384  	oldAggrState := lb.state
   385  	lb.state = lb.aggregateSubConnStates()
   386  	// Regenerate picker when one of the following happens:
   387  	//  - caller wants to regenerate
   388  	//  - the aggregated state changed
   389  	if forceRegeneratePicker || (lb.state != oldAggrState) {
   390  		lb.regeneratePicker(resetDrop)
   391  	}
   392  	var cc balancer.ClientConn = lb.cc
   393  	if lb.usePickFirst {
   394  		// Bypass the caching layer that would wrap the picker.
   395  		cc = lb.cc.ClientConn
   396  	}
   397  
   398  	cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
   399  }
   400  
   401  // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
   402  // resolved backends (backends received from resolver, not from remote balancer)
   403  // if no connection to remote balancers was successful.
   404  func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
   405  	timer := time.NewTimer(fallbackTimeout)
   406  	defer timer.Stop()
   407  	select {
   408  	case <-timer.C:
   409  	case <-lb.doneCh:
   410  		return
   411  	}
   412  	lb.mu.Lock()
   413  	if lb.inFallback || lb.serverListReceived {
   414  		lb.mu.Unlock()
   415  		return
   416  	}
   417  	// Enter fallback.
   418  	lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
   419  	lb.mu.Unlock()
   420  }
   421  
   422  func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
   423  	lb.mu.Lock()
   424  	defer lb.mu.Unlock()
   425  
   426  	// grpclb uses the user's dial target to populate the `Name` field of the
   427  	// `InitialLoadBalanceRequest` message sent to the remote balancer. But when
   428  	// grpclb is used a child policy in the context of RLS, we want the `Name`
   429  	// field to be populated with the value received from the RLS server. To
   430  	// support this use case, an optional "target_name" field has been added to
   431  	// the grpclb LB policy's config.  If specified, it overrides the name of
   432  	// the target to be sent to the remote balancer; if not, the target to be
   433  	// sent to the balancer will continue to be obtained from the target URI
   434  	// passed to the gRPC client channel. Whenever that target to be sent to the
   435  	// balancer is updated, we need to restart the stream to the balancer as
   436  	// this target is sent in the first message on the stream.
   437  	if gc != nil {
   438  		target := lb.dialTarget
   439  		if gc.ServiceName != "" {
   440  			target = gc.ServiceName
   441  		}
   442  		if target != lb.target {
   443  			lb.target = target
   444  			if lb.ccRemoteLB != nil {
   445  				lb.ccRemoteLB.cancelRemoteBalancerCall()
   446  			}
   447  		}
   448  	}
   449  
   450  	newUsePickFirst := childIsPickFirst(gc)
   451  	if lb.usePickFirst == newUsePickFirst {
   452  		return
   453  	}
   454  	if lb.logger.V(2) {
   455  		lb.logger.Infof("Switching mode. Is pick_first used for backends? %v", newUsePickFirst)
   456  	}
   457  	lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
   458  }
   459  
   460  func (lb *lbBalancer) ResolverError(error) {
   461  	// Ignore resolver errors.  GRPCLB is not selected unless the resolver
   462  	// works at least once.
   463  }
   464  
   465  func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
   466  	if lb.logger.V(2) {
   467  		lb.logger.Infof("UpdateClientConnState: %s", pretty.ToJSON(ccs))
   468  	}
   469  	gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
   470  	lb.handleServiceConfig(gc)
   471  
   472  	backendAddrs := ccs.ResolverState.Addresses
   473  
   474  	var remoteBalancerAddrs []resolver.Address
   475  	if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
   476  		// Override any balancer addresses provided via
   477  		// ccs.ResolverState.Addresses.
   478  		remoteBalancerAddrs = sd.BalancerAddresses
   479  	}
   480  
   481  	if len(backendAddrs)+len(remoteBalancerAddrs) == 0 {
   482  		// There should be at least one address, either grpclb server or
   483  		// fallback. Empty address is not valid.
   484  		return balancer.ErrBadResolverState
   485  	}
   486  
   487  	if len(remoteBalancerAddrs) == 0 {
   488  		if lb.ccRemoteLB != nil {
   489  			lb.ccRemoteLB.close()
   490  			lb.ccRemoteLB = nil
   491  		}
   492  	} else if lb.ccRemoteLB == nil {
   493  		// First time receiving resolved addresses, create a cc to remote
   494  		// balancers.
   495  		if err := lb.newRemoteBalancerCCWrapper(); err != nil {
   496  			return err
   497  		}
   498  		// Start the fallback goroutine.
   499  		go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
   500  	}
   501  
   502  	if lb.ccRemoteLB != nil {
   503  		// cc to remote balancers uses lb.manualResolver. Send the updated remote
   504  		// balancer addresses to it through manualResolver.
   505  		lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
   506  	}
   507  
   508  	lb.mu.Lock()
   509  	lb.resolvedBackendAddrs = backendAddrs
   510  	if len(remoteBalancerAddrs) == 0 || lb.inFallback {
   511  		// If there's no remote balancer address in ClientConn update, grpclb
   512  		// enters fallback mode immediately.
   513  		//
   514  		// If a new update is received while grpclb is in fallback, update the
   515  		// list of backends being used to the new fallback backends.
   516  		lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
   517  	}
   518  	lb.mu.Unlock()
   519  	return nil
   520  }
   521  
   522  func (lb *lbBalancer) Close() {
   523  	select {
   524  	case <-lb.doneCh:
   525  		return
   526  	default:
   527  	}
   528  	close(lb.doneCh)
   529  	if lb.ccRemoteLB != nil {
   530  		lb.ccRemoteLB.close()
   531  	}
   532  	lb.cc.close()
   533  }
   534  
   535  func (lb *lbBalancer) ExitIdle() {}
   536  

View as plain text