/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Package cdsbalancer implements a balancer to handle CDS responses. package cdsbalancer import ( "context" "encoding/json" "fmt" "sync/atomic" "unsafe" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/internal/balancer/nop" xdsinternal "google.golang.org/grpc/internal/credentials/xds" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) const ( cdsName = "cds_experimental" aggregateClusterMaxDepth = 16 ) var ( errBalancerClosed = fmt.Errorf("cds_experimental LB policy is closed") errExceedsMaxDepth = fmt.Errorf("aggregate cluster graph exceeds max depth (%d)", aggregateClusterMaxDepth) // newChildBalancer is a helper function to build a new cluster_resolver // balancer and will be overridden in unittests. newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { builder := balancer.Get(clusterresolver.Name) if builder == nil { return nil, fmt.Errorf("xds: no balancer builder with name %v", clusterresolver.Name) } // We directly pass the parent clientConn to the underlying // cluster_resolver balancer because the cdsBalancer does not deal with // subConns. return builder.Build(cc, opts), nil } buildProvider = buildProviderFunc ) func init() { balancer.Register(bb{}) } // bb implements the balancer.Builder interface to help build a cdsBalancer. // It also implements the balancer.ConfigParser interface to help parse the // JSON service config, to be passed to the cdsBalancer. type bb struct{} // Build creates a new CDS balancer with the ClientConn. func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { builder := balancer.Get(clusterresolver.Name) if builder == nil { // Shouldn't happen, registered through imported Cluster Resolver, // defensive programming. logger.Errorf("%q LB policy is needed but not registered", clusterresolver.Name) return nop.NewBalancer(cc, fmt.Errorf("%q LB policy is needed but not registered", clusterresolver.Name)) } parser, ok := builder.(balancer.ConfigParser) if !ok { // Shouldn't happen, imported Cluster Resolver builder has this method. logger.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name) return nop.NewBalancer(cc, fmt.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name)) } ctx, cancel := context.WithCancel(context.Background()) hi := xdsinternal.NewHandshakeInfo(nil, nil, nil, false) xdsHIPtr := unsafe.Pointer(hi) b := &cdsBalancer{ bOpts: opts, childConfigParser: parser, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, xdsHIPtr: &xdsHIPtr, watchers: make(map[string]*watcherState), } b.ccw = &ccWrapper{ ClientConn: cc, xdsHIPtr: b.xdsHIPtr, } b.logger = prefixLogger(b) b.logger.Infof("Created") var creds credentials.TransportCredentials switch { case opts.DialCreds != nil: creds = opts.DialCreds case opts.CredsBundle != nil: creds = opts.CredsBundle.TransportCredentials() } if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() { b.xdsCredsInUse = true } b.logger.Infof("xDS credentials in use: %v", b.xdsCredsInUse) return b } // Name returns the name of balancers built by this builder. func (bb) Name() string { return cdsName } // lbConfig represents the loadBalancingConfig section of the service config // for the cdsBalancer. type lbConfig struct { serviceconfig.LoadBalancingConfig ClusterName string `json:"Cluster"` } // ParseConfig parses the JSON load balancer config provided into an // internal form or returns an error if the config is invalid. func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { var cfg lbConfig if err := json.Unmarshal(c, &cfg); err != nil { return nil, fmt.Errorf("xds: unable to unmarshal lbconfig: %s, error: %v", string(c), err) } return &cfg, nil } // cdsBalancer implements a CDS based LB policy. It instantiates a // cluster_resolver balancer to further resolve the serviceName received from // CDS, into localities and endpoints. Implements the balancer.Balancer // interface which is exposed to gRPC and implements the balancer.ClientConn // interface which is exposed to the cluster_resolver balancer. type cdsBalancer struct { // The following fields are initialized at build time and are either // read-only after that or provide their own synchronization, and therefore // do not need to be guarded by a mutex. ccw *ccWrapper // ClientConn interface passed to child LB. bOpts balancer.BuildOptions // BuildOptions passed to child LB. childConfigParser balancer.ConfigParser // Config parser for cluster_resolver LB policy. logger *grpclog.PrefixLogger // Prefix logger for all logging. xdsCredsInUse bool xdsHIPtr *unsafe.Pointer // Accessed atomically. // The serializer and its cancel func are initialized at build time, and the // rest of the fields here are only accessed from serializer callbacks (or // from balancer.Balancer methods, which themselves are guaranteed to be // mutually exclusive) and hence do not need to be guarded by a mutex. serializer *grpcsync.CallbackSerializer // Serializes updates from gRPC and xDS client. serializerCancel context.CancelFunc // Stops the above serializer. childLB balancer.Balancer // Child policy, built upon resolution of the cluster graph. xdsClient xdsclient.XDSClient // xDS client to watch Cluster resources. watchers map[string]*watcherState // Set of watchers and associated state, keyed by cluster name. lbCfg *lbConfig // Current load balancing configuration. // The certificate providers are cached here to that they can be closed when // a new provider is to be created. cachedRoot certprovider.Provider cachedIdentity certprovider.Provider } // handleSecurityConfig processes the security configuration received from the // management server, creates appropriate certificate provider plugins, and // updates the HandshakeInfo which is added as an address attribute in // NewSubConn() calls. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) handleSecurityConfig(config *xdsresource.SecurityConfig) error { // If xdsCredentials are not in use, i.e, the user did not want to get // security configuration from an xDS server, we should not be acting on the // received security config here. Doing so poses a security threat. if !b.xdsCredsInUse { return nil } var xdsHI *xdsinternal.HandshakeInfo // Security config being nil is a valid case where the management server has // not sent any security configuration. The xdsCredentials implementation // handles this by delegating to its fallback credentials. if config == nil { // We need to explicitly set the fields to nil here since this might be // a case of switching from a good security configuration to an empty // one where fallback credentials are to be used. xdsHI = xdsinternal.NewHandshakeInfo(nil, nil, nil, false) atomic.StorePointer(b.xdsHIPtr, unsafe.Pointer(xdsHI)) return nil } // A root provider is required whether we are using TLS or mTLS. cpc := b.xdsClient.BootstrapConfig().CertProviderConfigs rootProvider, err := buildProvider(cpc, config.RootInstanceName, config.RootCertName, false, true) if err != nil { return err } // The identity provider is only present when using mTLS. var identityProvider certprovider.Provider if name, cert := config.IdentityInstanceName, config.IdentityCertName; name != "" { var err error identityProvider, err = buildProvider(cpc, name, cert, true, false) if err != nil { return err } } // Close the old providers and cache the new ones. if b.cachedRoot != nil { b.cachedRoot.Close() } if b.cachedIdentity != nil { b.cachedIdentity.Close() } b.cachedRoot = rootProvider b.cachedIdentity = identityProvider xdsHI = xdsinternal.NewHandshakeInfo(rootProvider, identityProvider, config.SubjectAltNameMatchers, false) atomic.StorePointer(b.xdsHIPtr, unsafe.Pointer(xdsHI)) return nil } func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanceName, certName string, wantIdentity, wantRoot bool) (certprovider.Provider, error) { cfg, ok := configs[instanceName] if !ok { // Defensive programming. If a resource received from the management // server contains a certificate provider instance name that is not // found in the bootstrap, the resource is NACKed by the xDS client. return nil, fmt.Errorf("certificate provider instance %q not found in bootstrap file", instanceName) } provider, err := cfg.Build(certprovider.BuildOptions{ CertName: certName, WantIdentity: wantIdentity, WantRoot: wantRoot, }) if err != nil { // This error is not expected since the bootstrap process parses the // config and makes sure that it is acceptable to the plugin. Still, it // is possible that the plugin parses the config successfully, but its // Build() method errors out. return nil, fmt.Errorf("xds: failed to get security plugin instance (%+v): %v", cfg, err) } return provider, nil } // A convenience method to create a watcher for cluster `name`. It also // registers the watch with the xDS client, and adds the newly created watcher // to the list of watchers maintained by the LB policy. func (b *cdsBalancer) createAndAddWatcherForCluster(name string) { w := &clusterWatcher{ name: name, parent: b, } ws := &watcherState{ watcher: w, cancelWatch: xdsresource.WatchCluster(b.xdsClient, name, w), } b.watchers[name] = ws } // UpdateClientConnState receives the serviceConfig (which contains the // clusterName to watch for in CDS) and the xdsClient object from the // xdsResolver. func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) error { if b.xdsClient == nil { c := xdsclient.FromResolverState(state.ResolverState) if c == nil { b.logger.Warningf("Received balancer config with no xDS client") return balancer.ErrBadResolverState } b.xdsClient = c } b.logger.Infof("Received balancer config update: %s", pretty.ToJSON(state.BalancerConfig)) // The errors checked here should ideally never happen because the // ServiceConfig in this case is prepared by the xdsResolver and is not // something that is received on the wire. lbCfg, ok := state.BalancerConfig.(*lbConfig) if !ok { b.logger.Warningf("Received unexpected balancer config type: %T", state.BalancerConfig) return balancer.ErrBadResolverState } if lbCfg.ClusterName == "" { b.logger.Warningf("Received balancer config with no cluster name") return balancer.ErrBadResolverState } // Do nothing and return early if configuration has not changed. if b.lbCfg != nil && b.lbCfg.ClusterName == lbCfg.ClusterName { return nil } b.lbCfg = lbCfg // Handle the update in a blocking fashion. done := make(chan struct{}) ok = b.serializer.Schedule(func(context.Context) { // A config update with a changed top-level cluster name means that none // of our old watchers make any sense any more. b.closeAllWatchers() // Create a new watcher for the top-level cluster. Upon resolution, it // could end up creating more watchers if turns out to be an aggregate // cluster. b.createAndAddWatcherForCluster(lbCfg.ClusterName) close(done) }) if !ok { // The call to Schedule returns false *only* if the serializer has been // closed, which happens only when we receive an update after close. return errBalancerClosed } <-done return nil } // ResolverError handles errors reported by the xdsResolver. func (b *cdsBalancer) ResolverError(err error) { b.serializer.Schedule(func(context.Context) { // Resource not found error is reported by the resolver when the // top-level cluster resource is removed by the management server. if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { b.closeAllWatchers() } var root string if b.lbCfg != nil { root = b.lbCfg.ClusterName } b.onClusterError(root, err) }) } // UpdateSubConnState handles subConn updates from gRPC. func (b *cdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state) } // Closes all registered cluster wathers and removes them from the internal map. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) closeAllWatchers() { for name, state := range b.watchers { state.cancelWatch() delete(b.watchers, name) } } // Close cancels the CDS watch, closes the child policy and closes the // cdsBalancer. func (b *cdsBalancer) Close() { b.serializer.Schedule(func(ctx context.Context) { b.closeAllWatchers() if b.childLB != nil { b.childLB.Close() b.childLB = nil } if b.cachedRoot != nil { b.cachedRoot.Close() } if b.cachedIdentity != nil { b.cachedIdentity.Close() } b.logger.Infof("Shutdown") }) b.serializerCancel() <-b.serializer.Done() } func (b *cdsBalancer) ExitIdle() { b.serializer.Schedule(func(context.Context) { if b.childLB == nil { b.logger.Warningf("Received ExitIdle with no child policy") return } // This implementation assumes the child balancer supports // ExitIdle (but still checks for the interface's existence to // avoid a panic if not). If the child does not, no subconns // will be connected. if ei, ok := b.childLB.(balancer.ExitIdler); ok { ei.ExitIdle() } }) } // Handles a good Cluster update from the xDS client. Kicks off the discovery // mechanism generation process from the top-level cluster and if the cluster // graph is resolved, generates child policy config and pushes it down. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpdate) { state := b.watchers[name] if state == nil { // We are currently not watching this cluster anymore. Return early. return } b.logger.Infof("Received Cluster resource: %s", pretty.ToJSON(update)) // Update the watchers map with the update for the cluster. state.lastUpdate = &update // For an aggregate cluster, always use the security configuration on the // root cluster. if name == b.lbCfg.ClusterName { // Process the security config from the received update before building the // child policy or forwarding the update to it. We do this because the child // policy may try to create a new subConn inline. Processing the security // configuration here and setting up the handshakeInfo will make sure that // such attempts are handled properly. if err := b.handleSecurityConfig(update.SecurityCfg); err != nil { // If the security config is invalid, for example, if the provider // instance is not found in the bootstrap config, we need to put the // channel in transient failure. b.onClusterError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err)) return } } clustersSeen := make(map[string]bool) dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen) if err != nil { b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err)) return } if ok { if len(dms) == 0 { b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters")) return } // Child policy is built the first time we resolve the cluster graph. if b.childLB == nil { childLB, err := newChildBalancer(b.ccw, b.bOpts) if err != nil { b.logger.Errorf("Failed to create child policy of type %s: %v", clusterresolver.Name, err) return } b.childLB = childLB b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name) } // Prepare the child policy configuration, convert it to JSON, have it // parsed by the child policy to convert it into service config and push // an update to it. childCfg := &clusterresolver.LBConfig{ DiscoveryMechanisms: dms, // The LB policy is configured by the root cluster. XDSLBPolicy: b.watchers[b.lbCfg.ClusterName].lastUpdate.LBPolicy, } cfgJSON, err := json.Marshal(childCfg) if err != nil { // Shouldn't happen, since we just prepared struct. b.logger.Errorf("cds_balancer: error marshalling prepared config: %v", childCfg) return } var sc serviceconfig.LoadBalancingConfig if sc, err = b.childConfigParser.ParseConfig(cfgJSON); err != nil { b.logger.Errorf("cds_balancer: cluster_resolver config generated %v is invalid: %v", string(cfgJSON), err) return } ccState := balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient), BalancerConfig: sc, } if err := b.childLB.UpdateClientConnState(ccState); err != nil { b.logger.Errorf("Encountered error when sending config {%+v} to child policy: %v", ccState, err) } } // We no longer need the clusters that we did not see in this iteration of // generateDMsForCluster(). for cluster := range clustersSeen { state, ok := b.watchers[cluster] if ok { continue } state.cancelWatch() delete(b.watchers, cluster) } } // Handles an error Cluster update from the xDS client. Propagates the error // down to the child policy if one exists, or puts the channel in // TRANSIENT_FAILURE. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) onClusterError(name string, err error) { b.logger.Warningf("Cluster resource %q received error update: %v", name, err) if b.childLB != nil { if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection { // Connection errors will be sent to the child balancers directly. // There's no need to forward them. b.childLB.ResolverError(err) } } else { // If child balancer was never created, fail the RPCs with // errors. b.ccw.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("%q: %v", name, err)), }) } } // Handles a resource-not-found error from the xDS client. Propagates the error // down to the child policy if one exists, or puts the channel in // TRANSIENT_FAILURE. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) onClusterResourceNotFound(name string) { err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name) if b.childLB != nil { b.childLB.ResolverError(err) } else { // If child balancer was never created, fail the RPCs with errors. b.ccw.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(err), }) } } // Generates discovery mechanisms for the cluster graph rooted at `name`. This // method is called recursively if `name` corresponds to an aggregate cluster, // with the base case for recursion being a leaf cluster. If a new cluster is // encountered when traversing the graph, a watcher is created for it. // // Inputs: // - name: name of the cluster to start from // - depth: recursion depth of the current cluster, starting from root // - dms: prioritized list of current discovery mechanisms // - clustersSeen: cluster names seen so far in the graph traversal // // Outputs: // - new prioritized list of discovery mechanisms // - boolean indicating if traversal of the aggregate cluster graph is // complete. If false, the above list of discovery mechanisms is ignored. // - error indicating if any error was encountered as part of the graph // traversal. If error is non-nil, the other return values are ignored. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []clusterresolver.DiscoveryMechanism, clustersSeen map[string]bool) ([]clusterresolver.DiscoveryMechanism, bool, error) { if depth >= aggregateClusterMaxDepth { return dms, false, errExceedsMaxDepth } if clustersSeen[name] { // Discovery mechanism already seen through a different branch. return dms, true, nil } clustersSeen[name] = true state, ok := b.watchers[name] if !ok { // If we have not seen this cluster so far, create a watcher for it, add // it to the map, start the watch and return. b.createAndAddWatcherForCluster(name) // And since we just created the watcher, we know that we haven't // resolved the cluster graph yet. return dms, false, nil } // A watcher exists, but no update has been received yet. if state.lastUpdate == nil { return dms, false, nil } var dm clusterresolver.DiscoveryMechanism cluster := state.lastUpdate switch cluster.ClusterType { case xdsresource.ClusterTypeAggregate: // This boolean is used to track if any of the clusters in the graph is // not yet completely resolved or returns errors, thereby allowing us to // traverse as much of the graph as possible (and start the associated // watches where required) to ensure that clustersSeen contains all // clusters in the graph that we can traverse to. missingCluster := false var err error for _, child := range cluster.PrioritizedClusterNames { var ok bool dms, ok, err = b.generateDMsForCluster(child, depth+1, dms, clustersSeen) if err != nil || !ok { missingCluster = true } } return dms, !missingCluster, err case xdsresource.ClusterTypeEDS: dm = clusterresolver.DiscoveryMechanism{ Type: clusterresolver.DiscoveryMechanismTypeEDS, Cluster: cluster.ClusterName, EDSServiceName: cluster.EDSServiceName, MaxConcurrentRequests: cluster.MaxRequests, LoadReportingServer: cluster.LRSServerConfig, } case xdsresource.ClusterTypeLogicalDNS: dm = clusterresolver.DiscoveryMechanism{ Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, Cluster: cluster.ClusterName, DNSHostname: cluster.DNSHostName, } } odJSON := cluster.OutlierDetection // "In the cds LB policy, if the outlier_detection field is not set in // the Cluster resource, a "no-op" outlier_detection config will be // generated in the corresponding DiscoveryMechanism config, with all // fields unset." - A50 if odJSON == nil { // This will pick up top level defaults in Cluster Resolver // ParseConfig, but sre and fpe will be nil still so still a // "no-op" config. odJSON = json.RawMessage(`{}`) } dm.OutlierDetection = odJSON dm.TelemetryLabels = cluster.TelemetryLabels return append(dms, dm), true, nil } // ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at // creation and intercepts the NewSubConn() and UpdateAddresses() call from the // child policy to add security configuration required by xDS credentials. // // Other methods of the balancer.ClientConn interface are not overridden and // hence get the original implementation. type ccWrapper struct { balancer.ClientConn xdsHIPtr *unsafe.Pointer } // NewSubConn intercepts NewSubConn() calls from the child policy and adds an // address attribute which provides all information required by the xdsCreds // handshaker to perform the TLS handshake. func (ccw *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { newAddrs := make([]resolver.Address, len(addrs)) for i, addr := range addrs { newAddrs[i] = xdsinternal.SetHandshakeInfo(addr, ccw.xdsHIPtr) } // No need to override opts.StateListener; just forward all calls to the // child that created the SubConn. return ccw.ClientConn.NewSubConn(newAddrs, opts) } func (ccw *ccWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { newAddrs := make([]resolver.Address, len(addrs)) for i, addr := range addrs { newAddrs[i] = xdsinternal.SetHandshakeInfo(addr, ccw.xdsHIPtr) } ccw.ClientConn.UpdateAddresses(sc, newAddrs) }