...
1 package grpc
2
3 import (
4 "crypto/tls"
5 "errors"
6 "fmt"
7
8 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
9 "github.com/jmhodges/clock"
10 "github.com/letsencrypt/boulder/cmd"
11 bcreds "github.com/letsencrypt/boulder/grpc/creds"
12 "github.com/prometheus/client_golang/prometheus"
13 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
14 "google.golang.org/grpc"
15
16
17
18
19
20 _ "github.com/letsencrypt/boulder/grpc/internal/resolver/dns"
21 "google.golang.org/grpc/balancer/roundrobin"
22 _ "google.golang.org/grpc/health"
23 )
24
25
26
27
28
29 func ClientSetup(c *cmd.GRPCClientConfig, tlsConfig *tls.Config, statsRegistry prometheus.Registerer, clk clock.Clock) (*grpc.ClientConn, error) {
30 if c == nil {
31 return nil, errors.New("nil gRPC client config provided: JSON config is probably missing a fooService section")
32 }
33 if tlsConfig == nil {
34 return nil, errNilTLS
35 }
36
37 metrics, err := newClientMetrics(statsRegistry)
38 if err != nil {
39 return nil, err
40 }
41
42 cmi := clientMetadataInterceptor{c.Timeout.Duration, metrics, clk, !c.NoWaitForReady}
43
44 unaryInterceptors := []grpc.UnaryClientInterceptor{
45 cmi.Unary,
46 cmi.metrics.grpcMetrics.UnaryClientInterceptor(),
47 otelgrpc.UnaryClientInterceptor(),
48 }
49
50 streamInterceptors := []grpc.StreamClientInterceptor{
51 cmi.Stream,
52 cmi.metrics.grpcMetrics.StreamClientInterceptor(),
53 otelgrpc.StreamClientInterceptor(),
54 }
55
56 target, hostOverride, err := c.MakeTargetAndHostOverride()
57 if err != nil {
58 return nil, err
59 }
60
61 creds := bcreds.NewClientCredentials(tlsConfig.RootCAs, tlsConfig.Certificates, hostOverride)
62 return grpc.Dial(
63 target,
64 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name)),
65 grpc.WithTransportCredentials(creds),
66 grpc.WithChainUnaryInterceptor(unaryInterceptors...),
67 grpc.WithChainStreamInterceptor(streamInterceptors...),
68 )
69 }
70
71
72
73 type clientMetrics struct {
74 grpcMetrics *grpc_prometheus.ClientMetrics
75
76
77 inFlightRPCs *prometheus.GaugeVec
78 }
79
80
81
82
83 func newClientMetrics(stats prometheus.Registerer) (clientMetrics, error) {
84
85 grpcMetrics := grpc_prometheus.NewClientMetrics()
86 grpcMetrics.EnableClientHandlingTimeHistogram()
87 err := stats.Register(grpcMetrics)
88 if err != nil {
89 are := prometheus.AlreadyRegisteredError{}
90 if errors.As(err, &are) {
91 grpcMetrics = are.ExistingCollector.(*grpc_prometheus.ClientMetrics)
92 } else {
93 return clientMetrics{}, err
94 }
95 }
96
97
98 inFlightGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
99 Name: "grpc_in_flight",
100 Help: "Number of in-flight (sent, not yet completed) RPCs",
101 }, []string{"method", "service"})
102 err = stats.Register(inFlightGauge)
103 if err != nil {
104 are := prometheus.AlreadyRegisteredError{}
105 if errors.As(err, &are) {
106 inFlightGauge = are.ExistingCollector.(*prometheus.GaugeVec)
107 } else {
108 return clientMetrics{}, err
109 }
110 }
111
112 return clientMetrics{
113 grpcMetrics: grpcMetrics,
114 inFlightRPCs: inFlightGauge,
115 }, nil
116 }
117
View as plain text