...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver

     1  /*
     2   *
     3   * Copyright 2019 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package clusterresolver contains the implementation of the
    20  // cluster_resolver_experimental LB policy which resolves endpoint addresses
    21  // using a list of one or more discovery mechanisms.
    22  package clusterresolver
    23  
    24  import (
    25  	"encoding/json"
    26  	"errors"
    27  	"fmt"
    28  
    29  	"google.golang.org/grpc/attributes"
    30  	"google.golang.org/grpc/balancer"
    31  	"google.golang.org/grpc/balancer/base"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/internal/balancer/nop"
    34  	"google.golang.org/grpc/internal/buffer"
    35  	"google.golang.org/grpc/internal/grpclog"
    36  	"google.golang.org/grpc/internal/grpcsync"
    37  	"google.golang.org/grpc/internal/pretty"
    38  	"google.golang.org/grpc/resolver"
    39  	"google.golang.org/grpc/serviceconfig"
    40  	"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
    41  	"google.golang.org/grpc/xds/internal/balancer/priority"
    42  	"google.golang.org/grpc/xds/internal/xdsclient"
    43  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    44  )
    45  
    46  // Name is the name of the cluster_resolver balancer.
    47  const Name = "cluster_resolver_experimental"
    48  
    49  var (
    50  	errBalancerClosed = errors.New("cdsBalancer is closed")
    51  	newChildBalancer  = func(bb balancer.Builder, cc balancer.ClientConn, o balancer.BuildOptions) balancer.Balancer {
    52  		return bb.Build(cc, o)
    53  	}
    54  )
    55  
    56  func init() {
    57  	balancer.Register(bb{})
    58  }
    59  
    60  type bb struct{}
    61  
    62  // Build helps implement the balancer.Builder interface.
    63  func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    64  	priorityBuilder := balancer.Get(priority.Name)
    65  	if priorityBuilder == nil {
    66  		logger.Errorf("%q LB policy is needed but not registered", priority.Name)
    67  		return nop.NewBalancer(cc, fmt.Errorf("%q LB policy is needed but not registered", priority.Name))
    68  	}
    69  	priorityConfigParser, ok := priorityBuilder.(balancer.ConfigParser)
    70  	if !ok {
    71  		logger.Errorf("%q LB policy does not implement a config parser", priority.Name)
    72  		return nop.NewBalancer(cc, fmt.Errorf("%q LB policy does not implement a config parser", priority.Name))
    73  	}
    74  
    75  	b := &clusterResolverBalancer{
    76  		bOpts:    opts,
    77  		updateCh: buffer.NewUnbounded(),
    78  		closed:   grpcsync.NewEvent(),
    79  		done:     grpcsync.NewEvent(),
    80  
    81  		priorityBuilder:      priorityBuilder,
    82  		priorityConfigParser: priorityConfigParser,
    83  	}
    84  	b.logger = prefixLogger(b)
    85  	b.logger.Infof("Created")
    86  
    87  	b.resourceWatcher = newResourceResolver(b, b.logger)
    88  	b.cc = &ccWrapper{
    89  		ClientConn:      cc,
    90  		b:               b,
    91  		resourceWatcher: b.resourceWatcher,
    92  	}
    93  
    94  	go b.run()
    95  	return b
    96  }
    97  
    98  func (bb) Name() string {
    99  	return Name
   100  }
   101  
   102  func (bb) ParseConfig(j json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
   103  	odBuilder := balancer.Get(outlierdetection.Name)
   104  	if odBuilder == nil {
   105  		// Shouldn't happen, registered through imported Outlier Detection,
   106  		// defensive programming.
   107  		return nil, fmt.Errorf("%q LB policy is needed but not registered", outlierdetection.Name)
   108  	}
   109  	odParser, ok := odBuilder.(balancer.ConfigParser)
   110  	if !ok {
   111  		// Shouldn't happen, imported Outlier Detection builder has this method.
   112  		return nil, fmt.Errorf("%q LB policy does not implement a config parser", outlierdetection.Name)
   113  	}
   114  
   115  	var cfg *LBConfig
   116  	if err := json.Unmarshal(j, &cfg); err != nil {
   117  		return nil, fmt.Errorf("unable to unmarshal balancer config %s into cluster-resolver config, error: %v", string(j), err)
   118  	}
   119  
   120  	for i, dm := range cfg.DiscoveryMechanisms {
   121  		lbCfg, err := odParser.ParseConfig(dm.OutlierDetection)
   122  		if err != nil {
   123  			return nil, fmt.Errorf("error parsing Outlier Detection config %v: %v", dm.OutlierDetection, err)
   124  		}
   125  		odCfg, ok := lbCfg.(*outlierdetection.LBConfig)
   126  		if !ok {
   127  			// Shouldn't happen, Parser built at build time with Outlier Detection
   128  			// builder pulled from gRPC LB Registry.
   129  			return nil, fmt.Errorf("odParser returned config with unexpected type %T: %v", lbCfg, lbCfg)
   130  		}
   131  		cfg.DiscoveryMechanisms[i].outlierDetection = *odCfg
   132  	}
   133  	if err := json.Unmarshal(cfg.XDSLBPolicy, &cfg.xdsLBPolicy); err != nil {
   134  		// This will never occur, valid configuration is emitted from the xDS
   135  		// Client. Validity is already checked in the xDS Client, however, this
   136  		// double validation is present because Unmarshalling and Validating are
   137  		// coupled into one json.Unmarshal operation). We will switch this in
   138  		// the future to two separate operations.
   139  		return nil, fmt.Errorf("error unmarshalling xDS LB Policy: %v", err)
   140  	}
   141  	return cfg, nil
   142  }
   143  
   144  // ccUpdate wraps a clientConn update received from gRPC.
   145  type ccUpdate struct {
   146  	state balancer.ClientConnState
   147  	err   error
   148  }
   149  
   150  type exitIdle struct{}
   151  
   152  // clusterResolverBalancer resolves endpoint addresses using a list of one or
   153  // more discovery mechanisms.
   154  type clusterResolverBalancer struct {
   155  	cc              balancer.ClientConn
   156  	bOpts           balancer.BuildOptions
   157  	updateCh        *buffer.Unbounded // Channel for updates from gRPC.
   158  	resourceWatcher *resourceResolver
   159  	logger          *grpclog.PrefixLogger
   160  	closed          *grpcsync.Event
   161  	done            *grpcsync.Event
   162  
   163  	priorityBuilder      balancer.Builder
   164  	priorityConfigParser balancer.ConfigParser
   165  
   166  	config          *LBConfig
   167  	configRaw       *serviceconfig.ParseResult
   168  	xdsClient       xdsclient.XDSClient    // xDS client to watch EDS resource.
   169  	attrsWithClient *attributes.Attributes // Attributes with xdsClient attached to be passed to the child policies.
   170  
   171  	child               balancer.Balancer
   172  	priorities          []priorityConfig
   173  	watchUpdateReceived bool
   174  }
   175  
   176  // handleClientConnUpdate handles a ClientConnUpdate received from gRPC.
   177  //
   178  // A good update results in creation of endpoint resolvers for the configured
   179  // discovery mechanisms. An update with an error results in cancellation of any
   180  // existing endpoint resolution and propagation of the same to the child policy.
   181  func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
   182  	if err := update.err; err != nil {
   183  		b.handleErrorFromUpdate(err, true)
   184  		return
   185  	}
   186  
   187  	b.logger.Infof("Received new balancer config: %v", pretty.ToJSON(update.state.BalancerConfig))
   188  	cfg, _ := update.state.BalancerConfig.(*LBConfig)
   189  	if cfg == nil {
   190  		b.logger.Warningf("Ignoring unsupported balancer configuration of type: %T", update.state.BalancerConfig)
   191  		return
   192  	}
   193  
   194  	b.config = cfg
   195  	b.configRaw = update.state.ResolverState.ServiceConfig
   196  	b.resourceWatcher.updateMechanisms(cfg.DiscoveryMechanisms)
   197  
   198  	// The child policy is created only after all configured discovery
   199  	// mechanisms have been successfully returned endpoints. If that is not the
   200  	// case, we return early.
   201  	if !b.watchUpdateReceived {
   202  		return
   203  	}
   204  	b.updateChildConfig()
   205  }
   206  
   207  // handleResourceUpdate handles a resource update or error from the resource
   208  // resolver by propagating the same to the child LB policy.
   209  func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
   210  	if err := update.err; err != nil {
   211  		b.handleErrorFromUpdate(err, false)
   212  		return
   213  	}
   214  
   215  	b.watchUpdateReceived = true
   216  	b.priorities = update.priorities
   217  
   218  	// An update from the resource resolver contains resolved endpoint addresses
   219  	// for all configured discovery mechanisms ordered by priority. This is used
   220  	// to generate configuration for the priority LB policy.
   221  	b.updateChildConfig()
   222  }
   223  
   224  // updateChildConfig builds child policy configuration using endpoint addresses
   225  // returned by the resource resolver and child policy configuration provided by
   226  // parent LB policy.
   227  //
   228  // A child policy is created if one doesn't already exist. The newly built
   229  // configuration is then pushed to the child policy.
   230  func (b *clusterResolverBalancer) updateChildConfig() {
   231  	if b.child == nil {
   232  		b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts)
   233  	}
   234  
   235  	childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
   236  	if err != nil {
   237  		b.logger.Warningf("Failed to build child policy config: %v", err)
   238  		return
   239  	}
   240  	childCfg, err := b.priorityConfigParser.ParseConfig(childCfgBytes)
   241  	if err != nil {
   242  		b.logger.Warningf("Failed to parse child policy config. This should never happen because the config was generated: %v", err)
   243  		return
   244  	}
   245  	if b.logger.V(2) {
   246  		b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg))
   247  	}
   248  
   249  	endpoints := make([]resolver.Endpoint, len(addrs))
   250  	for i, a := range addrs {
   251  		endpoints[i].Attributes = a.BalancerAttributes
   252  		endpoints[i].Addresses = []resolver.Address{a}
   253  		endpoints[i].Addresses[0].BalancerAttributes = nil
   254  	}
   255  	if err := b.child.UpdateClientConnState(balancer.ClientConnState{
   256  		ResolverState: resolver.State{
   257  			Endpoints:     endpoints,
   258  			Addresses:     addrs,
   259  			ServiceConfig: b.configRaw,
   260  			Attributes:    b.attrsWithClient,
   261  		},
   262  		BalancerConfig: childCfg,
   263  	}); err != nil {
   264  		b.logger.Warningf("Failed to push config to child policy: %v", err)
   265  	}
   266  }
   267  
   268  // handleErrorFromUpdate handles errors from the parent LB policy and endpoint
   269  // resolvers. fromParent is true if error is from the parent LB policy. In both
   270  // cases, the error is propagated to the child policy, if one exists.
   271  func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bool) {
   272  	b.logger.Warningf("Received error: %v", err)
   273  
   274  	// A resource-not-found error from the parent LB policy means that the LDS
   275  	// or CDS resource was removed. This should result in endpoint resolvers
   276  	// being stopped here.
   277  	//
   278  	// A resource-not-found error from the EDS endpoint resolver means that the
   279  	// EDS resource was removed. No action needs to be taken for this, and we
   280  	// should continue watching the same EDS resource.
   281  	if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
   282  		b.resourceWatcher.stop(false)
   283  	}
   284  
   285  	if b.child != nil {
   286  		b.child.ResolverError(err)
   287  		return
   288  	}
   289  	b.cc.UpdateState(balancer.State{
   290  		ConnectivityState: connectivity.TransientFailure,
   291  		Picker:            base.NewErrPicker(err),
   292  	})
   293  }
   294  
   295  // run is a long-running goroutine that handles updates from gRPC and endpoint
   296  // resolvers. The methods handling the individual updates simply push them onto
   297  // a channel which is read and acted upon from here.
   298  func (b *clusterResolverBalancer) run() {
   299  	for {
   300  		select {
   301  		case u, ok := <-b.updateCh.Get():
   302  			if !ok {
   303  				return
   304  			}
   305  			b.updateCh.Load()
   306  			switch update := u.(type) {
   307  			case *ccUpdate:
   308  				b.handleClientConnUpdate(update)
   309  			case exitIdle:
   310  				if b.child == nil {
   311  					b.logger.Errorf("xds: received ExitIdle with no child balancer")
   312  					break
   313  				}
   314  				// This implementation assumes the child balancer supports
   315  				// ExitIdle (but still checks for the interface's existence to
   316  				// avoid a panic if not).  If the child does not, no subconns
   317  				// will be connected.
   318  				if ei, ok := b.child.(balancer.ExitIdler); ok {
   319  					ei.ExitIdle()
   320  				}
   321  			}
   322  		case u := <-b.resourceWatcher.updateChannel:
   323  			b.handleResourceUpdate(u)
   324  
   325  		// Close results in stopping the endpoint resolvers and closing the
   326  		// underlying child policy and is the only way to exit this goroutine.
   327  		case <-b.closed.Done():
   328  			b.resourceWatcher.stop(true)
   329  
   330  			if b.child != nil {
   331  				b.child.Close()
   332  				b.child = nil
   333  			}
   334  			b.updateCh.Close()
   335  			// This is the *ONLY* point of return from this function.
   336  			b.logger.Infof("Shutdown")
   337  			b.done.Fire()
   338  			return
   339  		}
   340  	}
   341  }
   342  
   343  // Following are methods to implement the balancer interface.
   344  
   345  func (b *clusterResolverBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
   346  	if b.closed.HasFired() {
   347  		b.logger.Warningf("Received update from gRPC {%+v} after close", state)
   348  		return errBalancerClosed
   349  	}
   350  
   351  	if b.xdsClient == nil {
   352  		c := xdsclient.FromResolverState(state.ResolverState)
   353  		if c == nil {
   354  			return balancer.ErrBadResolverState
   355  		}
   356  		b.xdsClient = c
   357  		b.attrsWithClient = state.ResolverState.Attributes
   358  	}
   359  
   360  	b.updateCh.Put(&ccUpdate{state: state})
   361  	return nil
   362  }
   363  
   364  // ResolverError handles errors reported by the xdsResolver.
   365  func (b *clusterResolverBalancer) ResolverError(err error) {
   366  	if b.closed.HasFired() {
   367  		b.logger.Warningf("Received resolver error {%v} after close", err)
   368  		return
   369  	}
   370  	b.updateCh.Put(&ccUpdate{err: err})
   371  }
   372  
   373  // UpdateSubConnState handles subConn updates from gRPC.
   374  func (b *clusterResolverBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   375  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   376  }
   377  
   378  // Close closes the cdsBalancer and the underlying child balancer.
   379  func (b *clusterResolverBalancer) Close() {
   380  	b.closed.Fire()
   381  	<-b.done.Done()
   382  }
   383  
   384  func (b *clusterResolverBalancer) ExitIdle() {
   385  	b.updateCh.Put(exitIdle{})
   386  }
   387  
   388  // ccWrapper overrides ResolveNow(), so that re-resolution from the child
   389  // policies will trigger the DNS resolver in cluster_resolver balancer.  It
   390  // also intercepts NewSubConn calls in case children don't set the
   391  // StateListener, to allow redirection to happen via this cluster_resolver
   392  // balancer.
   393  type ccWrapper struct {
   394  	balancer.ClientConn
   395  	b               *clusterResolverBalancer
   396  	resourceWatcher *resourceResolver
   397  }
   398  
   399  func (c *ccWrapper) ResolveNow(resolver.ResolveNowOptions) {
   400  	c.resourceWatcher.resolveNow()
   401  }
   402  

View as plain text