...

Source file src/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer.go

Documentation: google.golang.org/grpc/xds/internal/balancer/cdsbalancer

     1  /*
     2   * Copyright 2019 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  // Package cdsbalancer implements a balancer to handle CDS responses.
    18  package cdsbalancer
    19  
    20  import (
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"sync/atomic"
    25  	"unsafe"
    26  
    27  	"google.golang.org/grpc/balancer"
    28  	"google.golang.org/grpc/balancer/base"
    29  	"google.golang.org/grpc/connectivity"
    30  	"google.golang.org/grpc/credentials"
    31  	"google.golang.org/grpc/credentials/tls/certprovider"
    32  	"google.golang.org/grpc/internal/balancer/nop"
    33  	xdsinternal "google.golang.org/grpc/internal/credentials/xds"
    34  	"google.golang.org/grpc/internal/grpclog"
    35  	"google.golang.org/grpc/internal/grpcsync"
    36  	"google.golang.org/grpc/internal/pretty"
    37  	"google.golang.org/grpc/resolver"
    38  	"google.golang.org/grpc/serviceconfig"
    39  	"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
    40  	"google.golang.org/grpc/xds/internal/xdsclient"
    41  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    42  )
    43  
    44  const (
    45  	cdsName                  = "cds_experimental"
    46  	aggregateClusterMaxDepth = 16
    47  )
    48  
    49  var (
    50  	errBalancerClosed  = fmt.Errorf("cds_experimental LB policy is closed")
    51  	errExceedsMaxDepth = fmt.Errorf("aggregate cluster graph exceeds max depth (%d)", aggregateClusterMaxDepth)
    52  
    53  	// newChildBalancer is a helper function to build a new cluster_resolver
    54  	// balancer and will be overridden in unittests.
    55  	newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
    56  		builder := balancer.Get(clusterresolver.Name)
    57  		if builder == nil {
    58  			return nil, fmt.Errorf("xds: no balancer builder with name %v", clusterresolver.Name)
    59  		}
    60  		// We directly pass the parent clientConn to the underlying
    61  		// cluster_resolver balancer because the cdsBalancer does not deal with
    62  		// subConns.
    63  		return builder.Build(cc, opts), nil
    64  	}
    65  	buildProvider = buildProviderFunc
    66  )
    67  
    68  func init() {
    69  	balancer.Register(bb{})
    70  }
    71  
    72  // bb implements the balancer.Builder interface to help build a cdsBalancer.
    73  // It also implements the balancer.ConfigParser interface to help parse the
    74  // JSON service config, to be passed to the cdsBalancer.
    75  type bb struct{}
    76  
    77  // Build creates a new CDS balancer with the ClientConn.
    78  func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    79  	builder := balancer.Get(clusterresolver.Name)
    80  	if builder == nil {
    81  		// Shouldn't happen, registered through imported Cluster Resolver,
    82  		// defensive programming.
    83  		logger.Errorf("%q LB policy is needed but not registered", clusterresolver.Name)
    84  		return nop.NewBalancer(cc, fmt.Errorf("%q LB policy is needed but not registered", clusterresolver.Name))
    85  	}
    86  	parser, ok := builder.(balancer.ConfigParser)
    87  	if !ok {
    88  		// Shouldn't happen, imported Cluster Resolver builder has this method.
    89  		logger.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name)
    90  		return nop.NewBalancer(cc, fmt.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name))
    91  	}
    92  
    93  	ctx, cancel := context.WithCancel(context.Background())
    94  	hi := xdsinternal.NewHandshakeInfo(nil, nil, nil, false)
    95  	xdsHIPtr := unsafe.Pointer(hi)
    96  	b := &cdsBalancer{
    97  		bOpts:             opts,
    98  		childConfigParser: parser,
    99  		serializer:        grpcsync.NewCallbackSerializer(ctx),
   100  		serializerCancel:  cancel,
   101  		xdsHIPtr:          &xdsHIPtr,
   102  		watchers:          make(map[string]*watcherState),
   103  	}
   104  	b.ccw = &ccWrapper{
   105  		ClientConn: cc,
   106  		xdsHIPtr:   b.xdsHIPtr,
   107  	}
   108  	b.logger = prefixLogger(b)
   109  	b.logger.Infof("Created")
   110  
   111  	var creds credentials.TransportCredentials
   112  	switch {
   113  	case opts.DialCreds != nil:
   114  		creds = opts.DialCreds
   115  	case opts.CredsBundle != nil:
   116  		creds = opts.CredsBundle.TransportCredentials()
   117  	}
   118  	if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
   119  		b.xdsCredsInUse = true
   120  	}
   121  	b.logger.Infof("xDS credentials in use: %v", b.xdsCredsInUse)
   122  	return b
   123  }
   124  
   125  // Name returns the name of balancers built by this builder.
   126  func (bb) Name() string {
   127  	return cdsName
   128  }
   129  
   130  // lbConfig represents the loadBalancingConfig section of the service config
   131  // for the cdsBalancer.
   132  type lbConfig struct {
   133  	serviceconfig.LoadBalancingConfig
   134  	ClusterName string `json:"Cluster"`
   135  }
   136  
   137  // ParseConfig parses the JSON load balancer config provided into an
   138  // internal form or returns an error if the config is invalid.
   139  func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
   140  	var cfg lbConfig
   141  	if err := json.Unmarshal(c, &cfg); err != nil {
   142  		return nil, fmt.Errorf("xds: unable to unmarshal lbconfig: %s, error: %v", string(c), err)
   143  	}
   144  	return &cfg, nil
   145  }
   146  
   147  // cdsBalancer implements a CDS based LB policy. It instantiates a
   148  // cluster_resolver balancer to further resolve the serviceName received from
   149  // CDS, into localities and endpoints. Implements the balancer.Balancer
   150  // interface which is exposed to gRPC and implements the balancer.ClientConn
   151  // interface which is exposed to the cluster_resolver balancer.
   152  type cdsBalancer struct {
   153  	// The following fields are initialized at build time and are either
   154  	// read-only after that or provide their own synchronization, and therefore
   155  	// do not need to be guarded by a mutex.
   156  	ccw               *ccWrapper            // ClientConn interface passed to child LB.
   157  	bOpts             balancer.BuildOptions // BuildOptions passed to child LB.
   158  	childConfigParser balancer.ConfigParser // Config parser for cluster_resolver LB policy.
   159  	logger            *grpclog.PrefixLogger // Prefix logger for all logging.
   160  	xdsCredsInUse     bool
   161  
   162  	xdsHIPtr *unsafe.Pointer // Accessed atomically.
   163  
   164  	// The serializer and its cancel func are initialized at build time, and the
   165  	// rest of the fields here are only accessed from serializer callbacks (or
   166  	// from balancer.Balancer methods, which themselves are guaranteed to be
   167  	// mutually exclusive) and hence do not need to be guarded by a mutex.
   168  	serializer       *grpcsync.CallbackSerializer // Serializes updates from gRPC and xDS client.
   169  	serializerCancel context.CancelFunc           // Stops the above serializer.
   170  	childLB          balancer.Balancer            // Child policy, built upon resolution of the cluster graph.
   171  	xdsClient        xdsclient.XDSClient          // xDS client to watch Cluster resources.
   172  	watchers         map[string]*watcherState     // Set of watchers and associated state, keyed by cluster name.
   173  	lbCfg            *lbConfig                    // Current load balancing configuration.
   174  
   175  	// The certificate providers are cached here to that they can be closed when
   176  	// a new provider is to be created.
   177  	cachedRoot     certprovider.Provider
   178  	cachedIdentity certprovider.Provider
   179  }
   180  
   181  // handleSecurityConfig processes the security configuration received from the
   182  // management server, creates appropriate certificate provider plugins, and
   183  // updates the HandshakeInfo which is added as an address attribute in
   184  // NewSubConn() calls.
   185  //
   186  // Only executed in the context of a serializer callback.
   187  func (b *cdsBalancer) handleSecurityConfig(config *xdsresource.SecurityConfig) error {
   188  	// If xdsCredentials are not in use, i.e, the user did not want to get
   189  	// security configuration from an xDS server, we should not be acting on the
   190  	// received security config here. Doing so poses a security threat.
   191  	if !b.xdsCredsInUse {
   192  		return nil
   193  	}
   194  	var xdsHI *xdsinternal.HandshakeInfo
   195  
   196  	// Security config being nil is a valid case where the management server has
   197  	// not sent any security configuration. The xdsCredentials implementation
   198  	// handles this by delegating to its fallback credentials.
   199  	if config == nil {
   200  		// We need to explicitly set the fields to nil here since this might be
   201  		// a case of switching from a good security configuration to an empty
   202  		// one where fallback credentials are to be used.
   203  		xdsHI = xdsinternal.NewHandshakeInfo(nil, nil, nil, false)
   204  		atomic.StorePointer(b.xdsHIPtr, unsafe.Pointer(xdsHI))
   205  		return nil
   206  
   207  	}
   208  
   209  	// A root provider is required whether we are using TLS or mTLS.
   210  	cpc := b.xdsClient.BootstrapConfig().CertProviderConfigs
   211  	rootProvider, err := buildProvider(cpc, config.RootInstanceName, config.RootCertName, false, true)
   212  	if err != nil {
   213  		return err
   214  	}
   215  
   216  	// The identity provider is only present when using mTLS.
   217  	var identityProvider certprovider.Provider
   218  	if name, cert := config.IdentityInstanceName, config.IdentityCertName; name != "" {
   219  		var err error
   220  		identityProvider, err = buildProvider(cpc, name, cert, true, false)
   221  		if err != nil {
   222  			return err
   223  		}
   224  	}
   225  
   226  	// Close the old providers and cache the new ones.
   227  	if b.cachedRoot != nil {
   228  		b.cachedRoot.Close()
   229  	}
   230  	if b.cachedIdentity != nil {
   231  		b.cachedIdentity.Close()
   232  	}
   233  	b.cachedRoot = rootProvider
   234  	b.cachedIdentity = identityProvider
   235  	xdsHI = xdsinternal.NewHandshakeInfo(rootProvider, identityProvider, config.SubjectAltNameMatchers, false)
   236  	atomic.StorePointer(b.xdsHIPtr, unsafe.Pointer(xdsHI))
   237  	return nil
   238  }
   239  
   240  func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanceName, certName string, wantIdentity, wantRoot bool) (certprovider.Provider, error) {
   241  	cfg, ok := configs[instanceName]
   242  	if !ok {
   243  		// Defensive programming. If a resource received from the management
   244  		// server contains a certificate provider instance name that is not
   245  		// found in the bootstrap, the resource is NACKed by the xDS client.
   246  		return nil, fmt.Errorf("certificate provider instance %q not found in bootstrap file", instanceName)
   247  	}
   248  	provider, err := cfg.Build(certprovider.BuildOptions{
   249  		CertName:     certName,
   250  		WantIdentity: wantIdentity,
   251  		WantRoot:     wantRoot,
   252  	})
   253  	if err != nil {
   254  		// This error is not expected since the bootstrap process parses the
   255  		// config and makes sure that it is acceptable to the plugin. Still, it
   256  		// is possible that the plugin parses the config successfully, but its
   257  		// Build() method errors out.
   258  		return nil, fmt.Errorf("xds: failed to get security plugin instance (%+v): %v", cfg, err)
   259  	}
   260  	return provider, nil
   261  }
   262  
   263  // A convenience method to create a watcher for cluster `name`. It also
   264  // registers the watch with the xDS client, and adds the newly created watcher
   265  // to the list of watchers maintained by the LB policy.
   266  func (b *cdsBalancer) createAndAddWatcherForCluster(name string) {
   267  	w := &clusterWatcher{
   268  		name:   name,
   269  		parent: b,
   270  	}
   271  	ws := &watcherState{
   272  		watcher:     w,
   273  		cancelWatch: xdsresource.WatchCluster(b.xdsClient, name, w),
   274  	}
   275  	b.watchers[name] = ws
   276  }
   277  
   278  // UpdateClientConnState receives the serviceConfig (which contains the
   279  // clusterName to watch for in CDS) and the xdsClient object from the
   280  // xdsResolver.
   281  func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
   282  	if b.xdsClient == nil {
   283  		c := xdsclient.FromResolverState(state.ResolverState)
   284  		if c == nil {
   285  			b.logger.Warningf("Received balancer config with no xDS client")
   286  			return balancer.ErrBadResolverState
   287  		}
   288  		b.xdsClient = c
   289  	}
   290  	b.logger.Infof("Received balancer config update: %s", pretty.ToJSON(state.BalancerConfig))
   291  
   292  	// The errors checked here should ideally never happen because the
   293  	// ServiceConfig in this case is prepared by the xdsResolver and is not
   294  	// something that is received on the wire.
   295  	lbCfg, ok := state.BalancerConfig.(*lbConfig)
   296  	if !ok {
   297  		b.logger.Warningf("Received unexpected balancer config type: %T", state.BalancerConfig)
   298  		return balancer.ErrBadResolverState
   299  	}
   300  	if lbCfg.ClusterName == "" {
   301  		b.logger.Warningf("Received balancer config with no cluster name")
   302  		return balancer.ErrBadResolverState
   303  	}
   304  
   305  	// Do nothing and return early if configuration has not changed.
   306  	if b.lbCfg != nil && b.lbCfg.ClusterName == lbCfg.ClusterName {
   307  		return nil
   308  	}
   309  	b.lbCfg = lbCfg
   310  
   311  	// Handle the update in a blocking fashion.
   312  	done := make(chan struct{})
   313  	ok = b.serializer.Schedule(func(context.Context) {
   314  		// A config update with a changed top-level cluster name means that none
   315  		// of our old watchers make any sense any more.
   316  		b.closeAllWatchers()
   317  
   318  		// Create a new watcher for the top-level cluster. Upon resolution, it
   319  		// could end up creating more watchers if turns out to be an aggregate
   320  		// cluster.
   321  		b.createAndAddWatcherForCluster(lbCfg.ClusterName)
   322  		close(done)
   323  	})
   324  	if !ok {
   325  		// The call to Schedule returns false *only* if the serializer has been
   326  		// closed, which happens only when we receive an update after close.
   327  		return errBalancerClosed
   328  	}
   329  	<-done
   330  	return nil
   331  }
   332  
   333  // ResolverError handles errors reported by the xdsResolver.
   334  func (b *cdsBalancer) ResolverError(err error) {
   335  	b.serializer.Schedule(func(context.Context) {
   336  		// Resource not found error is reported by the resolver when the
   337  		// top-level cluster resource is removed by the management server.
   338  		if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
   339  			b.closeAllWatchers()
   340  		}
   341  		var root string
   342  		if b.lbCfg != nil {
   343  			root = b.lbCfg.ClusterName
   344  		}
   345  		b.onClusterError(root, err)
   346  	})
   347  }
   348  
   349  // UpdateSubConnState handles subConn updates from gRPC.
   350  func (b *cdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   351  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   352  }
   353  
   354  // Closes all registered cluster wathers and removes them from the internal map.
   355  //
   356  // Only executed in the context of a serializer callback.
   357  func (b *cdsBalancer) closeAllWatchers() {
   358  	for name, state := range b.watchers {
   359  		state.cancelWatch()
   360  		delete(b.watchers, name)
   361  	}
   362  }
   363  
   364  // Close cancels the CDS watch, closes the child policy and closes the
   365  // cdsBalancer.
   366  func (b *cdsBalancer) Close() {
   367  	b.serializer.Schedule(func(ctx context.Context) {
   368  		b.closeAllWatchers()
   369  
   370  		if b.childLB != nil {
   371  			b.childLB.Close()
   372  			b.childLB = nil
   373  		}
   374  		if b.cachedRoot != nil {
   375  			b.cachedRoot.Close()
   376  		}
   377  		if b.cachedIdentity != nil {
   378  			b.cachedIdentity.Close()
   379  		}
   380  		b.logger.Infof("Shutdown")
   381  	})
   382  	b.serializerCancel()
   383  	<-b.serializer.Done()
   384  }
   385  
   386  func (b *cdsBalancer) ExitIdle() {
   387  	b.serializer.Schedule(func(context.Context) {
   388  		if b.childLB == nil {
   389  			b.logger.Warningf("Received ExitIdle with no child policy")
   390  			return
   391  		}
   392  		// This implementation assumes the child balancer supports
   393  		// ExitIdle (but still checks for the interface's existence to
   394  		// avoid a panic if not).  If the child does not, no subconns
   395  		// will be connected.
   396  		if ei, ok := b.childLB.(balancer.ExitIdler); ok {
   397  			ei.ExitIdle()
   398  		}
   399  	})
   400  }
   401  
   402  // Handles a good Cluster update from the xDS client. Kicks off the discovery
   403  // mechanism generation process from the top-level cluster and if the cluster
   404  // graph is resolved, generates child policy config and pushes it down.
   405  //
   406  // Only executed in the context of a serializer callback.
   407  func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpdate) {
   408  	state := b.watchers[name]
   409  	if state == nil {
   410  		// We are currently not watching this cluster anymore. Return early.
   411  		return
   412  	}
   413  
   414  	b.logger.Infof("Received Cluster resource: %s", pretty.ToJSON(update))
   415  
   416  	// Update the watchers map with the update for the cluster.
   417  	state.lastUpdate = &update
   418  
   419  	// For an aggregate cluster, always use the security configuration on the
   420  	// root cluster.
   421  	if name == b.lbCfg.ClusterName {
   422  		// Process the security config from the received update before building the
   423  		// child policy or forwarding the update to it. We do this because the child
   424  		// policy may try to create a new subConn inline. Processing the security
   425  		// configuration here and setting up the handshakeInfo will make sure that
   426  		// such attempts are handled properly.
   427  		if err := b.handleSecurityConfig(update.SecurityCfg); err != nil {
   428  			// If the security config is invalid, for example, if the provider
   429  			// instance is not found in the bootstrap config, we need to put the
   430  			// channel in transient failure.
   431  			b.onClusterError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
   432  			return
   433  		}
   434  	}
   435  
   436  	clustersSeen := make(map[string]bool)
   437  	dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen)
   438  	if err != nil {
   439  		b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
   440  		return
   441  	}
   442  	if ok {
   443  		if len(dms) == 0 {
   444  			b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
   445  			return
   446  		}
   447  		// Child policy is built the first time we resolve the cluster graph.
   448  		if b.childLB == nil {
   449  			childLB, err := newChildBalancer(b.ccw, b.bOpts)
   450  			if err != nil {
   451  				b.logger.Errorf("Failed to create child policy of type %s: %v", clusterresolver.Name, err)
   452  				return
   453  			}
   454  			b.childLB = childLB
   455  			b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name)
   456  		}
   457  
   458  		// Prepare the child policy configuration, convert it to JSON, have it
   459  		// parsed by the child policy to convert it into service config and push
   460  		// an update to it.
   461  		childCfg := &clusterresolver.LBConfig{
   462  			DiscoveryMechanisms: dms,
   463  			// The LB policy is configured by the root cluster.
   464  			XDSLBPolicy: b.watchers[b.lbCfg.ClusterName].lastUpdate.LBPolicy,
   465  		}
   466  		cfgJSON, err := json.Marshal(childCfg)
   467  		if err != nil {
   468  			// Shouldn't happen, since we just prepared struct.
   469  			b.logger.Errorf("cds_balancer: error marshalling prepared config: %v", childCfg)
   470  			return
   471  		}
   472  
   473  		var sc serviceconfig.LoadBalancingConfig
   474  		if sc, err = b.childConfigParser.ParseConfig(cfgJSON); err != nil {
   475  			b.logger.Errorf("cds_balancer: cluster_resolver config generated %v is invalid: %v", string(cfgJSON), err)
   476  			return
   477  		}
   478  
   479  		ccState := balancer.ClientConnState{
   480  			ResolverState:  xdsclient.SetClient(resolver.State{}, b.xdsClient),
   481  			BalancerConfig: sc,
   482  		}
   483  		if err := b.childLB.UpdateClientConnState(ccState); err != nil {
   484  			b.logger.Errorf("Encountered error when sending config {%+v} to child policy: %v", ccState, err)
   485  		}
   486  	}
   487  	// We no longer need the clusters that we did not see in this iteration of
   488  	// generateDMsForCluster().
   489  	for cluster := range clustersSeen {
   490  		state, ok := b.watchers[cluster]
   491  		if ok {
   492  			continue
   493  		}
   494  		state.cancelWatch()
   495  		delete(b.watchers, cluster)
   496  	}
   497  }
   498  
   499  // Handles an error Cluster update from the xDS client. Propagates the error
   500  // down to the child policy if one exists, or puts the channel in
   501  // TRANSIENT_FAILURE.
   502  //
   503  // Only executed in the context of a serializer callback.
   504  func (b *cdsBalancer) onClusterError(name string, err error) {
   505  	b.logger.Warningf("Cluster resource %q received error update: %v", name, err)
   506  
   507  	if b.childLB != nil {
   508  		if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection {
   509  			// Connection errors will be sent to the child balancers directly.
   510  			// There's no need to forward them.
   511  			b.childLB.ResolverError(err)
   512  		}
   513  	} else {
   514  		// If child balancer was never created, fail the RPCs with
   515  		// errors.
   516  		b.ccw.UpdateState(balancer.State{
   517  			ConnectivityState: connectivity.TransientFailure,
   518  			Picker:            base.NewErrPicker(fmt.Errorf("%q: %v", name, err)),
   519  		})
   520  	}
   521  }
   522  
   523  // Handles a resource-not-found error from the xDS client. Propagates the error
   524  // down to the child policy if one exists, or puts the channel in
   525  // TRANSIENT_FAILURE.
   526  //
   527  // Only executed in the context of a serializer callback.
   528  func (b *cdsBalancer) onClusterResourceNotFound(name string) {
   529  	err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name)
   530  	if b.childLB != nil {
   531  		b.childLB.ResolverError(err)
   532  	} else {
   533  		// If child balancer was never created, fail the RPCs with errors.
   534  		b.ccw.UpdateState(balancer.State{
   535  			ConnectivityState: connectivity.TransientFailure,
   536  			Picker:            base.NewErrPicker(err),
   537  		})
   538  	}
   539  }
   540  
   541  // Generates discovery mechanisms for the cluster graph rooted at `name`. This
   542  // method is called recursively if `name` corresponds to an aggregate cluster,
   543  // with the base case for recursion being a leaf cluster. If a new cluster is
   544  // encountered when traversing the graph, a watcher is created for it.
   545  //
   546  // Inputs:
   547  // - name: name of the cluster to start from
   548  // - depth: recursion depth of the current cluster, starting from root
   549  // - dms: prioritized list of current discovery mechanisms
   550  // - clustersSeen: cluster names seen so far in the graph traversal
   551  //
   552  // Outputs:
   553  //   - new prioritized list of discovery mechanisms
   554  //   - boolean indicating if traversal of the aggregate cluster graph is
   555  //     complete. If false, the above list of discovery mechanisms is ignored.
   556  //   - error indicating if any error was encountered as part of the graph
   557  //     traversal. If error is non-nil, the other return values are ignored.
   558  //
   559  // Only executed in the context of a serializer callback.
   560  func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []clusterresolver.DiscoveryMechanism, clustersSeen map[string]bool) ([]clusterresolver.DiscoveryMechanism, bool, error) {
   561  	if depth >= aggregateClusterMaxDepth {
   562  		return dms, false, errExceedsMaxDepth
   563  	}
   564  
   565  	if clustersSeen[name] {
   566  		// Discovery mechanism already seen through a different branch.
   567  		return dms, true, nil
   568  	}
   569  	clustersSeen[name] = true
   570  
   571  	state, ok := b.watchers[name]
   572  	if !ok {
   573  		// If we have not seen this cluster so far, create a watcher for it, add
   574  		// it to the map, start the watch and return.
   575  		b.createAndAddWatcherForCluster(name)
   576  
   577  		// And since we just created the watcher, we know that we haven't
   578  		// resolved the cluster graph yet.
   579  		return dms, false, nil
   580  	}
   581  
   582  	// A watcher exists, but no update has been received yet.
   583  	if state.lastUpdate == nil {
   584  		return dms, false, nil
   585  	}
   586  
   587  	var dm clusterresolver.DiscoveryMechanism
   588  	cluster := state.lastUpdate
   589  	switch cluster.ClusterType {
   590  	case xdsresource.ClusterTypeAggregate:
   591  		// This boolean is used to track if any of the clusters in the graph is
   592  		// not yet completely resolved or returns errors, thereby allowing us to
   593  		// traverse as much of the graph as possible (and start the associated
   594  		// watches where required) to ensure that clustersSeen contains all
   595  		// clusters in the graph that we can traverse to.
   596  		missingCluster := false
   597  		var err error
   598  		for _, child := range cluster.PrioritizedClusterNames {
   599  			var ok bool
   600  			dms, ok, err = b.generateDMsForCluster(child, depth+1, dms, clustersSeen)
   601  			if err != nil || !ok {
   602  				missingCluster = true
   603  			}
   604  		}
   605  		return dms, !missingCluster, err
   606  	case xdsresource.ClusterTypeEDS:
   607  		dm = clusterresolver.DiscoveryMechanism{
   608  			Type:                  clusterresolver.DiscoveryMechanismTypeEDS,
   609  			Cluster:               cluster.ClusterName,
   610  			EDSServiceName:        cluster.EDSServiceName,
   611  			MaxConcurrentRequests: cluster.MaxRequests,
   612  			LoadReportingServer:   cluster.LRSServerConfig,
   613  		}
   614  	case xdsresource.ClusterTypeLogicalDNS:
   615  		dm = clusterresolver.DiscoveryMechanism{
   616  			Type:        clusterresolver.DiscoveryMechanismTypeLogicalDNS,
   617  			Cluster:     cluster.ClusterName,
   618  			DNSHostname: cluster.DNSHostName,
   619  		}
   620  	}
   621  	odJSON := cluster.OutlierDetection
   622  	// "In the cds LB policy, if the outlier_detection field is not set in
   623  	// the Cluster resource, a "no-op" outlier_detection config will be
   624  	// generated in the corresponding DiscoveryMechanism config, with all
   625  	// fields unset." - A50
   626  	if odJSON == nil {
   627  		// This will pick up top level defaults in Cluster Resolver
   628  		// ParseConfig, but sre and fpe will be nil still so still a
   629  		// "no-op" config.
   630  		odJSON = json.RawMessage(`{}`)
   631  	}
   632  	dm.OutlierDetection = odJSON
   633  
   634  	dm.TelemetryLabels = cluster.TelemetryLabels
   635  
   636  	return append(dms, dm), true, nil
   637  }
   638  
   639  // ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at
   640  // creation and intercepts the NewSubConn() and UpdateAddresses() call from the
   641  // child policy to add security configuration required by xDS credentials.
   642  //
   643  // Other methods of the balancer.ClientConn interface are not overridden and
   644  // hence get the original implementation.
   645  type ccWrapper struct {
   646  	balancer.ClientConn
   647  
   648  	xdsHIPtr *unsafe.Pointer
   649  }
   650  
   651  // NewSubConn intercepts NewSubConn() calls from the child policy and adds an
   652  // address attribute which provides all information required by the xdsCreds
   653  // handshaker to perform the TLS handshake.
   654  func (ccw *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
   655  	newAddrs := make([]resolver.Address, len(addrs))
   656  	for i, addr := range addrs {
   657  		newAddrs[i] = xdsinternal.SetHandshakeInfo(addr, ccw.xdsHIPtr)
   658  	}
   659  
   660  	// No need to override opts.StateListener; just forward all calls to the
   661  	// child that created the SubConn.
   662  	return ccw.ClientConn.NewSubConn(newAddrs, opts)
   663  }
   664  
   665  func (ccw *ccWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
   666  	newAddrs := make([]resolver.Address, len(addrs))
   667  	for i, addr := range addrs {
   668  		newAddrs[i] = xdsinternal.SetHandshakeInfo(addr, ccw.xdsHIPtr)
   669  	}
   670  	ccw.ClientConn.UpdateAddresses(sc, newAddrs)
   671  }
   672  

View as plain text