
Source file src/google.golang.org/grpc/balancer/rls/control_channel.go

Documentation: google.golang.org/grpc/balancer/rls

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    19  package rls
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"time"
    26  	"google.golang.org/grpc"
    27  	"google.golang.org/grpc/balancer"
    28  	"google.golang.org/grpc/balancer/rls/internal/adaptive"
    29  	"google.golang.org/grpc/connectivity"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	"google.golang.org/grpc/internal"
    32  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    33  	"google.golang.org/grpc/internal/pretty"
    34  	rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
    35  	rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
    36  )
    38  var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() }
    40  type adaptiveThrottler interface {
    41  	ShouldThrottle() bool
    42  	RegisterBackendResponse(throttled bool)
    43  }
    45  // controlChannel is a wrapper around the gRPC channel to the RLS server
    46  // specified in the service config.
    47  type controlChannel struct {
    48  	// rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB
    49  	// policy receives this value in its service config.
    50  	rpcTimeout time.Duration
    51  	// backToReadyFunc is a callback to be invoked when the connectivity state
    52  	// changes from READY --> TRANSIENT_FAILURE --> READY.
    53  	backToReadyFunc func()
    54  	// throttler in an adaptive throttling implementation used to avoid
    55  	// hammering the RLS service while it is overloaded or down.
    56  	throttler adaptiveThrottler
    58  	cc     *grpc.ClientConn
    59  	client rlsgrpc.RouteLookupServiceClient
    60  	logger *internalgrpclog.PrefixLogger
    61  }
    63  // newControlChannel creates a controlChannel to rlsServerName and uses
    64  // serviceConfig, if non-empty, as the default service config for the underlying
    65  // gRPC channel.
    66  func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
    67  	ctrlCh := &controlChannel{
    68  		rpcTimeout:      rpcTimeout,
    69  		backToReadyFunc: backToReadyFunc,
    70  		throttler:       newAdaptiveThrottler(),
    71  	}
    72  	ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
    74  	dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
    75  	if err != nil {
    76  		return nil, err
    77  	}
    78  	ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
    79  	if err != nil {
    80  		return nil, err
    81  	}
    82  	ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
    83  	ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
    85  	go ctrlCh.monitorConnectivityState()
    86  	return ctrlCh, nil
    87  }
    89  // dialOpts constructs the dial options for the control plane channel.
    90  func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
    91  	// The control plane channel will use the same authority as the parent
    92  	// channel for server authorization. This ensures that the identity of the
    93  	// RLS server and the identity of the backends is the same, so if the RLS
    94  	// config is injected by an attacker, it cannot cause leakage of private
    95  	// information contained in headers set by the application.
    96  	dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
    97  	if bOpts.Dialer != nil {
    98  		dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
    99  	}
   101  	// The control channel will use the channel credentials from the parent
   102  	// channel, including any call creds associated with the channel creds.
   103  	var credsOpt grpc.DialOption
   104  	switch {
   105  	case bOpts.DialCreds != nil:
   106  		credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
   107  	case bOpts.CredsBundle != nil:
   108  		// The "fallback" mode in google default credentials (which is the only
   109  		// type of credentials we expect to be used with RLS) uses TLS/ALTS
   110  		// creds for transport and uses the same call creds as that on the
   111  		// parent bundle.
   112  		bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
   113  		if err != nil {
   114  			return nil, err
   115  		}
   116  		credsOpt = grpc.WithCredentialsBundle(bundle)
   117  	default:
   118  		cc.logger.Warningf("no credentials available, using Insecure")
   119  		credsOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
   120  	}
   121  	dopts = append(dopts, credsOpt)
   123  	// If the RLS LB policy's configuration specified a service config for the
   124  	// control channel, use that and disable service config fetching via the name
   125  	// resolver for the control channel.
   126  	if serviceConfig != "" {
   127  		cc.logger.Infof("Disabling service config from the name resolver and instead using: %s", serviceConfig)
   128  		dopts = append(dopts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(serviceConfig))
   129  	}
   131  	return dopts, nil
   132  }
   134  func (cc *controlChannel) monitorConnectivityState() {
   135  	cc.logger.Infof("Starting connectivity state monitoring goroutine")
   136  	// Since we use two mechanisms to deal with RLS server being down:
   137  	//   - adaptive throttling for the channel as a whole
   138  	//   - exponential backoff on a per-request basis
   139  	// we need a way to avoid double-penalizing requests by counting failures
   140  	// toward both mechanisms when the RLS server is unreachable.
   141  	//
   142  	// To accomplish this, we monitor the state of the control plane channel. If
   143  	// the state has been TRANSIENT_FAILURE since the last time it was in state
   144  	// READY, and it then transitions into state READY, we push on a channel
   145  	// which is being read by the LB policy.
   146  	//
   147  	// The LB the policy will iterate through the cache to reset the backoff
   148  	// timeouts in all cache entries. Specifically, this means that it will
   149  	// reset the backoff state and cancel the pending backoff timer. Note that
   150  	// when cancelling the backoff timer, just like when the backoff timer fires
   151  	// normally, a new picker is returned to the channel, to force it to
   152  	// re-process any wait-for-ready RPCs that may still be queued if we failed
   153  	// them while we were in backoff. However, we should optimize this case by
   154  	// returning only one new picker, regardless of how many backoff timers are
   155  	// cancelled.
   157  	// Using the background context is fine here since we check for the ClientConn
   158  	// entering SHUTDOWN and return early in that case.
   159  	ctx := context.Background()
   161  	first := true
   162  	for {
   163  		// Wait for the control channel to become READY.
   164  		for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
   165  			if s == connectivity.Shutdown {
   166  				return
   167  			}
   168  			cc.cc.WaitForStateChange(ctx, s)
   169  		}
   170  		cc.logger.Infof("Connectivity state is READY")
   172  		if !first {
   173  			cc.logger.Infof("Control channel back to READY")
   174  			cc.backToReadyFunc()
   175  		}
   176  		first = false
   178  		// Wait for the control channel to move out of READY.
   179  		cc.cc.WaitForStateChange(ctx, connectivity.Ready)
   180  		if cc.cc.GetState() == connectivity.Shutdown {
   181  			return
   182  		}
   183  		cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
   184  	}
   185  }
   187  func (cc *controlChannel) close() {
   188  	cc.logger.Infof("Closing control channel")
   189  	cc.cc.Close()
   190  }
   192  type lookupCallback func(targets []string, headerData string, err error)
   194  // lookup starts a RouteLookup RPC in a separate goroutine and returns the
   195  // results (and error, if any) in the provided callback.
   196  //
   197  // The returned boolean indicates whether the request was throttled by the
   198  // client-side adaptive throttling algorithm in which case the provided callback
   199  // will not be invoked.
   200  func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string, cb lookupCallback) (throttled bool) {
   201  	if cc.throttler.ShouldThrottle() {
   202  		cc.logger.Infof("RLS request throttled by client-side adaptive throttling")
   203  		return true
   204  	}
   205  	go func() {
   206  		req := &rlspb.RouteLookupRequest{
   207  			TargetType:      "grpc",
   208  			KeyMap:          reqKeys,
   209  			Reason:          reason,
   210  			StaleHeaderData: staleHeaders,
   211  		}
   212  		cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))
   214  		ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
   215  		defer cancel()
   216  		resp, err := cc.client.RouteLookup(ctx, req)
   217  		cb(resp.GetTargets(), resp.GetHeaderData(), err)
   218  	}()
   219  	return false
   220  }

View as plain text