1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdmain
16
17 import (
18 "context"
19 "crypto/tls"
20 "crypto/x509"
21 "fmt"
22 "io/ioutil"
23 "log"
24 "math"
25 "net"
26 "net/http"
27 "net/url"
28 "os"
29 "path/filepath"
30 "time"
31
32 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
33 "go.etcd.io/etcd/client/pkg/v3/logutil"
34 "go.etcd.io/etcd/client/pkg/v3/tlsutil"
35 "go.etcd.io/etcd/client/pkg/v3/transport"
36 clientv3 "go.etcd.io/etcd/client/v3"
37 "go.etcd.io/etcd/client/v3/leasing"
38 "go.etcd.io/etcd/client/v3/namespace"
39 "go.etcd.io/etcd/client/v3/ordering"
40 "go.etcd.io/etcd/pkg/v3/debugutil"
41 "go.etcd.io/etcd/server/v3/embed"
42 "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
43 "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
44 "go.etcd.io/etcd/server/v3/proxy/grpcproxy"
45
46 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
47 "github.com/soheilhy/cmux"
48 "github.com/spf13/cobra"
49 "go.uber.org/zap"
50 "go.uber.org/zap/zapgrpc"
51 "golang.org/x/net/http2"
52 "google.golang.org/grpc"
53 "google.golang.org/grpc/grpclog"
54 "google.golang.org/grpc/keepalive"
55 )
56
57 var (
58 grpcProxyListenAddr string
59 grpcProxyMetricsListenAddr string
60 grpcProxyEndpoints []string
61 grpcProxyEndpointsAutoSyncInterval time.Duration
62 grpcProxyDialKeepAliveTime time.Duration
63 grpcProxyDialKeepAliveTimeout time.Duration
64 grpcProxyPermitWithoutStream bool
65 grpcProxyDNSCluster string
66 grpcProxyDNSClusterServiceName string
67 grpcProxyInsecureDiscovery bool
68 grpcProxyDataDir string
69 grpcMaxCallSendMsgSize int
70 grpcMaxCallRecvMsgSize int
71
72
73
74 grpcProxyCA string
75 grpcProxyCert string
76 grpcProxyKey string
77 grpcProxyInsecureSkipTLSVerify bool
78
79
80
81 grpcProxyListenCA string
82 grpcProxyListenCert string
83 grpcProxyListenKey string
84 grpcProxyListenCipherSuites []string
85 grpcProxyListenAutoTLS bool
86 grpcProxyListenCRL string
87 selfSignedCertValidity uint
88
89 grpcProxyAdvertiseClientURL string
90 grpcProxyResolverPrefix string
91 grpcProxyResolverTTL int
92
93 grpcProxyNamespace string
94 grpcProxyLeasing string
95
96 grpcProxyEnablePprof bool
97 grpcProxyEnableOrdering bool
98
99 grpcProxyDebug bool
100
101
102 grpcKeepAliveMinTime time.Duration
103 grpcKeepAliveTimeout time.Duration
104 grpcKeepAliveInterval time.Duration
105
106 maxConcurrentStreams uint32
107 )
108
109 const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
110
111 func init() {
112 rootCmd.AddCommand(newGRPCProxyCommand())
113 }
114
115
116 func newGRPCProxyCommand() *cobra.Command {
117 lpc := &cobra.Command{
118 Use: "grpc-proxy <subcommand>",
119 Short: "grpc-proxy related command",
120 }
121 lpc.AddCommand(newGRPCProxyStartCommand())
122
123 return lpc
124 }
125
126 func newGRPCProxyStartCommand() *cobra.Command {
127 cmd := cobra.Command{
128 Use: "start",
129 Short: "start the grpc proxy",
130 Run: startGRPCProxy,
131 }
132
133 cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
134 cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "domain name to query for SRV records describing cluster endpoints")
135 cmd.Flags().StringVar(&grpcProxyDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
136 cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for endpoint /metrics requests on an additional interface")
137 cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
138 cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
139 cmd.Flags().DurationVar(&grpcProxyEndpointsAutoSyncInterval, "endpoints-auto-sync-interval", 0, "etcd endpoints auto sync interval (disabled by default)")
140 cmd.Flags().DurationVar(&grpcProxyDialKeepAliveTime, "dial-keepalive-time", 0, "keepalive time for client(grpc-proxy) connections (default 0, disable).")
141 cmd.Flags().DurationVar(&grpcProxyDialKeepAliveTimeout, "dial-keepalive-timeout", embed.DefaultGRPCKeepAliveTimeout, "keepalive timeout for client(grpc-proxy) connections (default 20s).")
142 cmd.Flags().BoolVar(&grpcProxyPermitWithoutStream, "permit-without-stream", false, "Enable client(grpc-proxy) to send keepalive pings even with no active RPCs.")
143 cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
144 cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
145 cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
146 cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
147 cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
148 cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
149 cmd.Flags().IntVar(&grpcMaxCallSendMsgSize, "max-send-bytes", defaultGRPCMaxCallSendMsgSize, "message send limits in bytes (default value is 1.5 MiB)")
150 cmd.Flags().IntVar(&grpcMaxCallRecvMsgSize, "max-recv-bytes", math.MaxInt32, "message receive limits in bytes (default value is math.MaxInt32)")
151 cmd.Flags().DurationVar(&grpcKeepAliveMinTime, "grpc-keepalive-min-time", embed.DefaultGRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging proxy.")
152 cmd.Flags().DurationVar(&grpcKeepAliveInterval, "grpc-keepalive-interval", embed.DefaultGRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
153 cmd.Flags().DurationVar(&grpcKeepAliveTimeout, "grpc-keepalive-timeout", embed.DefaultGRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
154
155
156 cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
157 cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
158 cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
159 cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates (CAUTION: this option should be enabled only for testing purposes)")
160
161
162 cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
163 cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
164 cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
165 cmd.Flags().StringSliceVar(&grpcProxyListenCipherSuites, "listen-cipher-suites", grpcProxyListenCipherSuites, "Comma-separated list of supported TLS cipher suites between client/proxy (empty will be auto-populated by Go).")
166 cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
167 cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
168 cmd.Flags().UintVar(&selfSignedCertValidity, "self-signed-cert-validity", 1, "The validity period of the proxy certificates, unit is year")
169
170
171 cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
172 cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
173
174 cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
175
176 cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.")
177
178 return &cmd
179 }
180
181 func startGRPCProxy(cmd *cobra.Command, args []string) {
182 checkArgs()
183 lvl := zap.InfoLevel
184 if grpcProxyDebug {
185 lvl = zap.DebugLevel
186 grpc.EnableTracing = true
187 }
188 lg, err := logutil.CreateDefaultZapLogger(lvl)
189 if err != nil {
190 panic(err)
191 }
192 defer lg.Sync()
193
194 grpclog.SetLoggerV2(zapgrpc.NewLogger(lg))
195
196
197
198
199 tlsInfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey, false)
200 if len(grpcProxyListenCipherSuites) > 0 {
201 cs, err := tlsutil.GetCipherSuites(grpcProxyListenCipherSuites)
202 if err != nil {
203 log.Fatal(err)
204 }
205 tlsInfo.CipherSuites = cs
206 }
207 if tlsInfo == nil && grpcProxyListenAutoTLS {
208 host := []string{"https://" + grpcProxyListenAddr}
209 dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
210 autoTLS, err := transport.SelfCert(lg, dir, host, selfSignedCertValidity)
211 if err != nil {
212 log.Fatal(err)
213 }
214 tlsInfo = &autoTLS
215 }
216
217 if tlsInfo != nil {
218 lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsInfo)))
219 }
220 m := mustListenCMux(lg, tlsInfo)
221 grpcl := m.Match(cmux.HTTP2())
222 defer func() {
223 grpcl.Close()
224 lg.Info("stop listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
225 }()
226
227 client := mustNewClient(lg)
228
229
230
231 var proxyClient *clientv3.Client
232 if grpcProxyAdvertiseClientURL != "" {
233 proxyClient = mustNewProxyClient(lg, tlsInfo)
234 }
235 httpClient := mustNewHTTPClient(lg)
236
237 srvhttp, httpl := mustHTTPListener(lg, m, tlsInfo, client, proxyClient)
238
239 if err := http2.ConfigureServer(srvhttp, &http2.Server{
240 MaxConcurrentStreams: maxConcurrentStreams,
241 }); err != nil {
242 lg.Fatal("Failed to configure the http server", zap.Error(err))
243 }
244
245 errc := make(chan error, 3)
246 go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
247 go func() { errc <- srvhttp.Serve(httpl) }()
248 go func() { errc <- m.Serve() }()
249 if len(grpcProxyMetricsListenAddr) > 0 {
250 mhttpl := mustMetricsListener(lg, tlsInfo)
251 go func() {
252 mux := http.NewServeMux()
253 grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())
254 grpcproxy.HandleHealth(lg, mux, client)
255 grpcproxy.HandleProxyMetrics(mux)
256 grpcproxy.HandleProxyHealth(lg, mux, proxyClient)
257 lg.Info("gRPC proxy server metrics URL serving")
258 herr := http.Serve(mhttpl, mux)
259 if herr != nil {
260 lg.Fatal("gRPC proxy server metrics URL returned", zap.Error(herr))
261 } else {
262 lg.Info("gRPC proxy server metrics URL returned")
263 }
264 }()
265 }
266
267 lg.Info("started gRPC proxy", zap.String("address", grpcProxyListenAddr))
268
269
270 notifySystemd(lg)
271
272 fmt.Fprintln(os.Stderr, <-errc)
273 os.Exit(1)
274 }
275
276 func checkArgs() {
277 if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
278 fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
279 os.Exit(1)
280 }
281 if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
282 fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
283 os.Exit(1)
284 }
285 if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
286 fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
287 os.Exit(1)
288 }
289 if grpcProxyListenAutoTLS && selfSignedCertValidity == 0 {
290 fmt.Fprintln(os.Stderr, fmt.Errorf("selfSignedCertValidity is invalid,it should be greater than 0"))
291 os.Exit(1)
292 }
293 }
294
295 func mustNewClient(lg *zap.Logger) *clientv3.Client {
296 srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery, grpcProxyDNSClusterServiceName)
297 eps := srvs.Endpoints
298 if len(eps) == 0 {
299 eps = grpcProxyEndpoints
300 }
301 cfg, err := newClientCfg(lg, eps)
302 if err != nil {
303 fmt.Fprintln(os.Stderr, err)
304 os.Exit(1)
305 }
306 cfg.DialOptions = append(cfg.DialOptions,
307 grpc.WithUnaryInterceptor(grpcproxy.AuthUnaryClientInterceptor))
308 cfg.DialOptions = append(cfg.DialOptions,
309 grpc.WithStreamInterceptor(grpcproxy.AuthStreamClientInterceptor))
310 cfg.Logger = lg.Named("client")
311 client, err := clientv3.New(*cfg)
312 if err != nil {
313 fmt.Fprintln(os.Stderr, err)
314 os.Exit(1)
315 }
316 return client
317 }
318
319 func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client {
320 eps := []string{grpcProxyAdvertiseClientURL}
321 cfg, err := newProxyClientCfg(lg.Named("client"), eps, tls)
322 if err != nil {
323 fmt.Fprintln(os.Stderr, err)
324 os.Exit(1)
325 }
326 client, err := clientv3.New(*cfg)
327 if err != nil {
328 fmt.Fprintln(os.Stderr, err)
329 os.Exit(1)
330 }
331 lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL))
332 return client
333 }
334
335 func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) {
336 cfg := clientv3.Config{
337 Endpoints: eps,
338 DialTimeout: 5 * time.Second,
339 Logger: lg,
340 }
341 if tls != nil {
342 clientTLS, err := tls.ClientConfig()
343 if err != nil {
344 return nil, err
345 }
346 cfg.TLS = clientTLS
347 }
348 return &cfg, nil
349 }
350
351 func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
352
353 cfg := clientv3.Config{
354 Endpoints: eps,
355 AutoSyncInterval: grpcProxyEndpointsAutoSyncInterval,
356 DialTimeout: 5 * time.Second,
357 }
358
359 if grpcMaxCallSendMsgSize > 0 {
360 cfg.MaxCallSendMsgSize = grpcMaxCallSendMsgSize
361 }
362 if grpcMaxCallRecvMsgSize > 0 {
363 cfg.MaxCallRecvMsgSize = grpcMaxCallRecvMsgSize
364 }
365 if grpcProxyDialKeepAliveTime > 0 {
366 cfg.DialKeepAliveTime = grpcProxyDialKeepAliveTime
367 }
368 if grpcProxyDialKeepAliveTimeout > 0 {
369 cfg.DialKeepAliveTimeout = grpcProxyDialKeepAliveTimeout
370 }
371 cfg.PermitWithoutStream = grpcProxyPermitWithoutStream
372
373 tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey, true)
374 if tls == nil && grpcProxyInsecureSkipTLSVerify {
375 tls = &transport.TLSInfo{}
376 }
377 if tls != nil {
378 clientTLS, err := tls.ClientConfig()
379 if err != nil {
380 return nil, err
381 }
382 clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
383 if clientTLS.InsecureSkipVerify {
384 lg.Warn("--insecure-skip-tls-verify was given, this grpc proxy process skips authentication of etcd server TLS certificates. This option should be enabled only for testing purposes.")
385 }
386 cfg.TLS = clientTLS
387 lg.Info("gRPC proxy client TLS", zap.String("tls-info", fmt.Sprintf("%+v", tls)))
388 }
389 return &cfg, nil
390 }
391
392 func newTLS(ca, cert, key string, requireEmptyCN bool) *transport.TLSInfo {
393 if ca == "" && cert == "" && key == "" {
394 return nil
395 }
396 return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key, EmptyCN: requireEmptyCN}
397 }
398
399 func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
400 l, err := net.Listen("tcp", grpcProxyListenAddr)
401 if err != nil {
402 fmt.Fprintln(os.Stderr, err)
403 os.Exit(1)
404 }
405
406 if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
407 fmt.Fprintln(os.Stderr, err)
408 os.Exit(1)
409 }
410 if tlsinfo != nil {
411 tlsinfo.CRLFile = grpcProxyListenCRL
412 if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
413 lg.Fatal("failed to create TLS listener", zap.Error(err))
414 }
415 }
416
417 lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
418 return cmux.New(l)
419 }
420
421 func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
422 if grpcProxyEnableOrdering {
423 vf := ordering.NewOrderViolationSwitchEndpointClosure(client)
424 client.KV = ordering.NewKV(client.KV, vf)
425 lg.Info("waiting for linearized read from cluster to recover ordering")
426 for {
427 _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
428 if err == nil {
429 break
430 }
431 lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
432 time.Sleep(time.Second)
433 }
434 }
435
436 if len(grpcProxyNamespace) > 0 {
437 client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
438 client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
439 client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
440 }
441
442 if len(grpcProxyLeasing) > 0 {
443 client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
444 }
445
446 kvp, _ := grpcproxy.NewKvProxy(client)
447 watchp, _ := grpcproxy.NewWatchProxy(client.Ctx(), lg, client)
448 if grpcProxyResolverPrefix != "" {
449 grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
450 }
451 clusterp, _ := grpcproxy.NewClusterProxy(lg, client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
452 leasep, _ := grpcproxy.NewLeaseProxy(client.Ctx(), client)
453
454 mainp := grpcproxy.NewMaintenanceProxy(client)
455 authp := grpcproxy.NewAuthProxy(client)
456 electionp := grpcproxy.NewElectionProxy(client)
457 lockp := grpcproxy.NewLockProxy(client)
458
459 gopts := []grpc.ServerOption{
460 grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
461 grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
462 grpc.MaxConcurrentStreams(math.MaxUint32),
463 }
464 if grpcKeepAliveMinTime > time.Duration(0) {
465 gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
466 MinTime: grpcKeepAliveMinTime,
467 PermitWithoutStream: false,
468 }))
469 }
470 if grpcKeepAliveInterval > time.Duration(0) ||
471 grpcKeepAliveTimeout > time.Duration(0) {
472 gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
473 Time: grpcKeepAliveInterval,
474 Timeout: grpcKeepAliveTimeout,
475 }))
476 }
477
478 server := grpc.NewServer(gopts...)
479
480 pb.RegisterKVServer(server, kvp)
481 pb.RegisterWatchServer(server, watchp)
482 pb.RegisterClusterServer(server, clusterp)
483 pb.RegisterLeaseServer(server, leasep)
484 pb.RegisterMaintenanceServer(server, mainp)
485 pb.RegisterAuthServer(server, authp)
486 v3electionpb.RegisterElectionServer(server, electionp)
487 v3lockpb.RegisterLockServer(server, lockp)
488
489 return server
490 }
491
492 func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) {
493 httpClient := mustNewHTTPClient(lg)
494 httpmux := http.NewServeMux()
495 httpmux.HandleFunc("/", http.NotFound)
496 grpcproxy.HandleMetrics(httpmux, httpClient, c.Endpoints())
497 grpcproxy.HandleHealth(lg, httpmux, c)
498 grpcproxy.HandleProxyMetrics(httpmux)
499 grpcproxy.HandleProxyHealth(lg, httpmux, proxy)
500 if grpcProxyEnablePprof {
501 for p, h := range debugutil.PProfHandlers() {
502 httpmux.Handle(p, h)
503 }
504 lg.Info("gRPC proxy enabled pprof", zap.String("path", debugutil.HTTPPrefixPProf))
505 }
506 srvhttp := &http.Server{
507 Handler: httpmux,
508 ErrorLog: log.New(ioutil.Discard, "net/http", 0),
509 }
510
511 if tlsinfo == nil {
512 return srvhttp, m.Match(cmux.HTTP1())
513 }
514
515 srvTLS, err := tlsinfo.ServerConfig()
516 if err != nil {
517 lg.Fatal("failed to set up TLS", zap.Error(err))
518 }
519 srvhttp.TLSConfig = srvTLS
520 return srvhttp, m.Match(cmux.Any())
521 }
522
523 func mustNewHTTPClient(lg *zap.Logger) *http.Client {
524 transport, err := newHTTPTransport(grpcProxyCA, grpcProxyCert, grpcProxyKey)
525 if err != nil {
526 fmt.Fprintln(os.Stderr, err)
527 os.Exit(1)
528 }
529 return &http.Client{Transport: transport}
530 }
531
532 func newHTTPTransport(ca, cert, key string) (*http.Transport, error) {
533 tr := &http.Transport{}
534
535 if ca != "" && cert != "" && key != "" {
536 caCert, err := ioutil.ReadFile(ca)
537 if err != nil {
538 return nil, err
539 }
540 keyPair, err := tls.LoadX509KeyPair(cert, key)
541 if err != nil {
542 return nil, err
543 }
544 caPool := x509.NewCertPool()
545 caPool.AppendCertsFromPEM(caCert)
546
547 tlsConfig := &tls.Config{
548 Certificates: []tls.Certificate{keyPair},
549 RootCAs: caPool,
550 }
551 tlsConfig.BuildNameToCertificate()
552 tr.TLSClientConfig = tlsConfig
553 } else if grpcProxyInsecureSkipTLSVerify {
554 tlsConfig := &tls.Config{InsecureSkipVerify: grpcProxyInsecureSkipTLSVerify}
555 tr.TLSClientConfig = tlsConfig
556 }
557 return tr, nil
558 }
559
560 func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener {
561 murl, err := url.Parse(grpcProxyMetricsListenAddr)
562 if err != nil {
563 fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
564 os.Exit(1)
565 }
566 ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
567 if err != nil {
568 fmt.Fprintln(os.Stderr, err)
569 os.Exit(1)
570 }
571 lg.Info("gRPC proxy listening for metrics", zap.String("address", murl.String()))
572 return ml
573 }
574
View as plain text