...

Source file src/github.com/letsencrypt/boulder/grpc/client.go

Documentation: github.com/letsencrypt/boulder/grpc

     1  package grpc
     2  
     3  import (
     4  	"crypto/tls"
     5  	"errors"
     6  	"fmt"
     7  
     8  	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
     9  	"github.com/jmhodges/clock"
    10  	"github.com/letsencrypt/boulder/cmd"
    11  	bcreds "github.com/letsencrypt/boulder/grpc/creds"
    12  	"github.com/prometheus/client_golang/prometheus"
    13  	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    14  	"google.golang.org/grpc"
    15  
    16  	// 'grpc/health' is imported for its init function, which causes clients to
    17  	// rely on the Health Service for load-balancing.
    18  	// 'grpc/internal/resolver/dns' is imported for its init function, which
    19  	// registers the SRV resolver.
    20  	_ "github.com/letsencrypt/boulder/grpc/internal/resolver/dns"
    21  	"google.golang.org/grpc/balancer/roundrobin"
    22  	_ "google.golang.org/grpc/health"
    23  )
    24  
    25  // ClientSetup creates a gRPC TransportCredentials that presents
    26  // a client certificate and validates the the server certificate based
    27  // on the provided *tls.Config.
    28  // It dials the remote service and returns a grpc.ClientConn if successful.
    29  func ClientSetup(c *cmd.GRPCClientConfig, tlsConfig *tls.Config, statsRegistry prometheus.Registerer, clk clock.Clock) (*grpc.ClientConn, error) {
    30  	if c == nil {
    31  		return nil, errors.New("nil gRPC client config provided: JSON config is probably missing a fooService section")
    32  	}
    33  	if tlsConfig == nil {
    34  		return nil, errNilTLS
    35  	}
    36  
    37  	metrics, err := newClientMetrics(statsRegistry)
    38  	if err != nil {
    39  		return nil, err
    40  	}
    41  
    42  	cmi := clientMetadataInterceptor{c.Timeout.Duration, metrics, clk, !c.NoWaitForReady}
    43  
    44  	unaryInterceptors := []grpc.UnaryClientInterceptor{
    45  		cmi.Unary,
    46  		cmi.metrics.grpcMetrics.UnaryClientInterceptor(),
    47  		otelgrpc.UnaryClientInterceptor(),
    48  	}
    49  
    50  	streamInterceptors := []grpc.StreamClientInterceptor{
    51  		cmi.Stream,
    52  		cmi.metrics.grpcMetrics.StreamClientInterceptor(),
    53  		otelgrpc.StreamClientInterceptor(),
    54  	}
    55  
    56  	target, hostOverride, err := c.MakeTargetAndHostOverride()
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  
    61  	creds := bcreds.NewClientCredentials(tlsConfig.RootCAs, tlsConfig.Certificates, hostOverride)
    62  	return grpc.Dial(
    63  		target,
    64  		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name)),
    65  		grpc.WithTransportCredentials(creds),
    66  		grpc.WithChainUnaryInterceptor(unaryInterceptors...),
    67  		grpc.WithChainStreamInterceptor(streamInterceptors...),
    68  	)
    69  }
    70  
    71  // clientMetrics is a struct type used to return registered metrics from
    72  // `NewClientMetrics`
    73  type clientMetrics struct {
    74  	grpcMetrics *grpc_prometheus.ClientMetrics
    75  	// inFlightRPCs is a labelled gauge that slices by service/method the number
    76  	// of outstanding/in-flight RPCs.
    77  	inFlightRPCs *prometheus.GaugeVec
    78  }
    79  
    80  // newClientMetrics constructs a *grpc_prometheus.ClientMetrics, registered with
    81  // the given registry, with timing histogram enabled. It must be called a
    82  // maximum of once per registry, or there will be conflicting names.
    83  func newClientMetrics(stats prometheus.Registerer) (clientMetrics, error) {
    84  	// Create the grpc prometheus client metrics instance and register it
    85  	grpcMetrics := grpc_prometheus.NewClientMetrics()
    86  	grpcMetrics.EnableClientHandlingTimeHistogram()
    87  	err := stats.Register(grpcMetrics)
    88  	if err != nil {
    89  		are := prometheus.AlreadyRegisteredError{}
    90  		if errors.As(err, &are) {
    91  			grpcMetrics = are.ExistingCollector.(*grpc_prometheus.ClientMetrics)
    92  		} else {
    93  			return clientMetrics{}, err
    94  		}
    95  	}
    96  
    97  	// Create a gauge to track in-flight RPCs and register it.
    98  	inFlightGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
    99  		Name: "grpc_in_flight",
   100  		Help: "Number of in-flight (sent, not yet completed) RPCs",
   101  	}, []string{"method", "service"})
   102  	err = stats.Register(inFlightGauge)
   103  	if err != nil {
   104  		are := prometheus.AlreadyRegisteredError{}
   105  		if errors.As(err, &are) {
   106  			inFlightGauge = are.ExistingCollector.(*prometheus.GaugeVec)
   107  		} else {
   108  			return clientMetrics{}, err
   109  		}
   110  	}
   111  
   112  	return clientMetrics{
   113  		grpcMetrics:  grpcMetrics,
   114  		inFlightRPCs: inFlightGauge,
   115  	}, nil
   116  }
   117  

View as plain text