...

Source file src/go.etcd.io/etcd/server/v3/etcdmain/grpc_proxy.go

Documentation: go.etcd.io/etcd/server/v3/etcdmain

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdmain
    16  
    17  import (
    18  	"context"
    19  	"crypto/tls"
    20  	"crypto/x509"
    21  	"fmt"
    22  	"io/ioutil"
    23  	"log"
    24  	"math"
    25  	"net"
    26  	"net/http"
    27  	"net/url"
    28  	"os"
    29  	"path/filepath"
    30  	"time"
    31  
    32  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    33  	"go.etcd.io/etcd/client/pkg/v3/logutil"
    34  	"go.etcd.io/etcd/client/pkg/v3/tlsutil"
    35  	"go.etcd.io/etcd/client/pkg/v3/transport"
    36  	clientv3 "go.etcd.io/etcd/client/v3"
    37  	"go.etcd.io/etcd/client/v3/leasing"
    38  	"go.etcd.io/etcd/client/v3/namespace"
    39  	"go.etcd.io/etcd/client/v3/ordering"
    40  	"go.etcd.io/etcd/pkg/v3/debugutil"
    41  	"go.etcd.io/etcd/server/v3/embed"
    42  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
    43  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
    44  	"go.etcd.io/etcd/server/v3/proxy/grpcproxy"
    45  
    46  	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
    47  	"github.com/soheilhy/cmux"
    48  	"github.com/spf13/cobra"
    49  	"go.uber.org/zap"
    50  	"go.uber.org/zap/zapgrpc"
    51  	"golang.org/x/net/http2"
    52  	"google.golang.org/grpc"
    53  	"google.golang.org/grpc/grpclog"
    54  	"google.golang.org/grpc/keepalive"
    55  )
    56  
    57  var (
    58  	grpcProxyListenAddr                string
    59  	grpcProxyMetricsListenAddr         string
    60  	grpcProxyEndpoints                 []string
    61  	grpcProxyEndpointsAutoSyncInterval time.Duration
    62  	grpcProxyDialKeepAliveTime         time.Duration
    63  	grpcProxyDialKeepAliveTimeout      time.Duration
    64  	grpcProxyPermitWithoutStream       bool
    65  	grpcProxyDNSCluster                string
    66  	grpcProxyDNSClusterServiceName     string
    67  	grpcProxyInsecureDiscovery         bool
    68  	grpcProxyDataDir                   string
    69  	grpcMaxCallSendMsgSize             int
    70  	grpcMaxCallRecvMsgSize             int
    71  
    72  	// tls for connecting to etcd
    73  
    74  	grpcProxyCA                    string
    75  	grpcProxyCert                  string
    76  	grpcProxyKey                   string
    77  	grpcProxyInsecureSkipTLSVerify bool
    78  
    79  	// tls for clients connecting to proxy
    80  
    81  	grpcProxyListenCA           string
    82  	grpcProxyListenCert         string
    83  	grpcProxyListenKey          string
    84  	grpcProxyListenCipherSuites []string
    85  	grpcProxyListenAutoTLS      bool
    86  	grpcProxyListenCRL          string
    87  	selfSignedCertValidity      uint
    88  
    89  	grpcProxyAdvertiseClientURL string
    90  	grpcProxyResolverPrefix     string
    91  	grpcProxyResolverTTL        int
    92  
    93  	grpcProxyNamespace string
    94  	grpcProxyLeasing   string
    95  
    96  	grpcProxyEnablePprof    bool
    97  	grpcProxyEnableOrdering bool
    98  
    99  	grpcProxyDebug bool
   100  
   101  	// GRPC keep alive related options.
   102  	grpcKeepAliveMinTime  time.Duration
   103  	grpcKeepAliveTimeout  time.Duration
   104  	grpcKeepAliveInterval time.Duration
   105  
   106  	maxConcurrentStreams uint32
   107  )
   108  
   109  const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
   110  
   111  func init() {
   112  	rootCmd.AddCommand(newGRPCProxyCommand())
   113  }
   114  
   115  // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
   116  func newGRPCProxyCommand() *cobra.Command {
   117  	lpc := &cobra.Command{
   118  		Use:   "grpc-proxy <subcommand>",
   119  		Short: "grpc-proxy related command",
   120  	}
   121  	lpc.AddCommand(newGRPCProxyStartCommand())
   122  
   123  	return lpc
   124  }
   125  
   126  func newGRPCProxyStartCommand() *cobra.Command {
   127  	cmd := cobra.Command{
   128  		Use:   "start",
   129  		Short: "start the grpc proxy",
   130  		Run:   startGRPCProxy,
   131  	}
   132  
   133  	cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
   134  	cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "domain name to query for SRV records describing cluster endpoints")
   135  	cmd.Flags().StringVar(&grpcProxyDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
   136  	cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for endpoint /metrics requests on an additional interface")
   137  	cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
   138  	cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
   139  	cmd.Flags().DurationVar(&grpcProxyEndpointsAutoSyncInterval, "endpoints-auto-sync-interval", 0, "etcd endpoints auto sync interval (disabled by default)")
   140  	cmd.Flags().DurationVar(&grpcProxyDialKeepAliveTime, "dial-keepalive-time", 0, "keepalive time for client(grpc-proxy) connections (default 0, disable).")
   141  	cmd.Flags().DurationVar(&grpcProxyDialKeepAliveTimeout, "dial-keepalive-timeout", embed.DefaultGRPCKeepAliveTimeout, "keepalive timeout for client(grpc-proxy) connections (default 20s).")
   142  	cmd.Flags().BoolVar(&grpcProxyPermitWithoutStream, "permit-without-stream", false, "Enable client(grpc-proxy) to send keepalive pings even with no active RPCs.")
   143  	cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
   144  	cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
   145  	cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
   146  	cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
   147  	cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
   148  	cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
   149  	cmd.Flags().IntVar(&grpcMaxCallSendMsgSize, "max-send-bytes", defaultGRPCMaxCallSendMsgSize, "message send limits in bytes (default value is 1.5 MiB)")
   150  	cmd.Flags().IntVar(&grpcMaxCallRecvMsgSize, "max-recv-bytes", math.MaxInt32, "message receive limits in bytes (default value is math.MaxInt32)")
   151  	cmd.Flags().DurationVar(&grpcKeepAliveMinTime, "grpc-keepalive-min-time", embed.DefaultGRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging proxy.")
   152  	cmd.Flags().DurationVar(&grpcKeepAliveInterval, "grpc-keepalive-interval", embed.DefaultGRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
   153  	cmd.Flags().DurationVar(&grpcKeepAliveTimeout, "grpc-keepalive-timeout", embed.DefaultGRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
   154  
   155  	// client TLS for connecting to server
   156  	cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
   157  	cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
   158  	cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
   159  	cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates (CAUTION: this option should be enabled only for testing purposes)")
   160  
   161  	// client TLS for connecting to proxy
   162  	cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
   163  	cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
   164  	cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
   165  	cmd.Flags().StringSliceVar(&grpcProxyListenCipherSuites, "listen-cipher-suites", grpcProxyListenCipherSuites, "Comma-separated list of supported TLS cipher suites between client/proxy (empty will be auto-populated by Go).")
   166  	cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
   167  	cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
   168  	cmd.Flags().UintVar(&selfSignedCertValidity, "self-signed-cert-validity", 1, "The validity period of the proxy certificates, unit is year")
   169  
   170  	// experimental flags
   171  	cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
   172  	cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
   173  
   174  	cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
   175  
   176  	cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.")
   177  
   178  	return &cmd
   179  }
   180  
   181  func startGRPCProxy(cmd *cobra.Command, args []string) {
   182  	checkArgs()
   183  	lvl := zap.InfoLevel
   184  	if grpcProxyDebug {
   185  		lvl = zap.DebugLevel
   186  		grpc.EnableTracing = true
   187  	}
   188  	lg, err := logutil.CreateDefaultZapLogger(lvl)
   189  	if err != nil {
   190  		panic(err)
   191  	}
   192  	defer lg.Sync()
   193  
   194  	grpclog.SetLoggerV2(zapgrpc.NewLogger(lg))
   195  
   196  	// The proxy itself (ListenCert) can have not-empty CN.
   197  	// The empty CN is required for grpcProxyCert.
   198  	// Please see https://github.com/etcd-io/etcd/issues/11970#issuecomment-687875315  for more context.
   199  	tlsInfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey, false)
   200  	if len(grpcProxyListenCipherSuites) > 0 {
   201  		cs, err := tlsutil.GetCipherSuites(grpcProxyListenCipherSuites)
   202  		if err != nil {
   203  			log.Fatal(err)
   204  		}
   205  		tlsInfo.CipherSuites = cs
   206  	}
   207  	if tlsInfo == nil && grpcProxyListenAutoTLS {
   208  		host := []string{"https://" + grpcProxyListenAddr}
   209  		dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
   210  		autoTLS, err := transport.SelfCert(lg, dir, host, selfSignedCertValidity)
   211  		if err != nil {
   212  			log.Fatal(err)
   213  		}
   214  		tlsInfo = &autoTLS
   215  	}
   216  
   217  	if tlsInfo != nil {
   218  		lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsInfo)))
   219  	}
   220  	m := mustListenCMux(lg, tlsInfo)
   221  	grpcl := m.Match(cmux.HTTP2())
   222  	defer func() {
   223  		grpcl.Close()
   224  		lg.Info("stop listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
   225  	}()
   226  
   227  	client := mustNewClient(lg)
   228  
   229  	// The proxy client is used for self-healthchecking.
   230  	// TODO: The mechanism should be refactored to use internal connection.
   231  	var proxyClient *clientv3.Client
   232  	if grpcProxyAdvertiseClientURL != "" {
   233  		proxyClient = mustNewProxyClient(lg, tlsInfo)
   234  	}
   235  	httpClient := mustNewHTTPClient(lg)
   236  
   237  	srvhttp, httpl := mustHTTPListener(lg, m, tlsInfo, client, proxyClient)
   238  
   239  	if err := http2.ConfigureServer(srvhttp, &http2.Server{
   240  		MaxConcurrentStreams: maxConcurrentStreams,
   241  	}); err != nil {
   242  		lg.Fatal("Failed to configure the http server", zap.Error(err))
   243  	}
   244  
   245  	errc := make(chan error, 3)
   246  	go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
   247  	go func() { errc <- srvhttp.Serve(httpl) }()
   248  	go func() { errc <- m.Serve() }()
   249  	if len(grpcProxyMetricsListenAddr) > 0 {
   250  		mhttpl := mustMetricsListener(lg, tlsInfo)
   251  		go func() {
   252  			mux := http.NewServeMux()
   253  			grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())
   254  			grpcproxy.HandleHealth(lg, mux, client)
   255  			grpcproxy.HandleProxyMetrics(mux)
   256  			grpcproxy.HandleProxyHealth(lg, mux, proxyClient)
   257  			lg.Info("gRPC proxy server metrics URL serving")
   258  			herr := http.Serve(mhttpl, mux)
   259  			if herr != nil {
   260  				lg.Fatal("gRPC proxy server metrics URL returned", zap.Error(herr))
   261  			} else {
   262  				lg.Info("gRPC proxy server metrics URL returned")
   263  			}
   264  		}()
   265  	}
   266  
   267  	lg.Info("started gRPC proxy", zap.String("address", grpcProxyListenAddr))
   268  
   269  	// grpc-proxy is initialized, ready to serve
   270  	notifySystemd(lg)
   271  
   272  	fmt.Fprintln(os.Stderr, <-errc)
   273  	os.Exit(1)
   274  }
   275  
   276  func checkArgs() {
   277  	if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
   278  		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
   279  		os.Exit(1)
   280  	}
   281  	if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
   282  		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
   283  		os.Exit(1)
   284  	}
   285  	if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
   286  		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
   287  		os.Exit(1)
   288  	}
   289  	if grpcProxyListenAutoTLS && selfSignedCertValidity == 0 {
   290  		fmt.Fprintln(os.Stderr, fmt.Errorf("selfSignedCertValidity is invalid,it should be greater than 0"))
   291  		os.Exit(1)
   292  	}
   293  }
   294  
   295  func mustNewClient(lg *zap.Logger) *clientv3.Client {
   296  	srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery, grpcProxyDNSClusterServiceName)
   297  	eps := srvs.Endpoints
   298  	if len(eps) == 0 {
   299  		eps = grpcProxyEndpoints
   300  	}
   301  	cfg, err := newClientCfg(lg, eps)
   302  	if err != nil {
   303  		fmt.Fprintln(os.Stderr, err)
   304  		os.Exit(1)
   305  	}
   306  	cfg.DialOptions = append(cfg.DialOptions,
   307  		grpc.WithUnaryInterceptor(grpcproxy.AuthUnaryClientInterceptor))
   308  	cfg.DialOptions = append(cfg.DialOptions,
   309  		grpc.WithStreamInterceptor(grpcproxy.AuthStreamClientInterceptor))
   310  	cfg.Logger = lg.Named("client")
   311  	client, err := clientv3.New(*cfg)
   312  	if err != nil {
   313  		fmt.Fprintln(os.Stderr, err)
   314  		os.Exit(1)
   315  	}
   316  	return client
   317  }
   318  
   319  func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client {
   320  	eps := []string{grpcProxyAdvertiseClientURL}
   321  	cfg, err := newProxyClientCfg(lg.Named("client"), eps, tls)
   322  	if err != nil {
   323  		fmt.Fprintln(os.Stderr, err)
   324  		os.Exit(1)
   325  	}
   326  	client, err := clientv3.New(*cfg)
   327  	if err != nil {
   328  		fmt.Fprintln(os.Stderr, err)
   329  		os.Exit(1)
   330  	}
   331  	lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL))
   332  	return client
   333  }
   334  
   335  func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) {
   336  	cfg := clientv3.Config{
   337  		Endpoints:   eps,
   338  		DialTimeout: 5 * time.Second,
   339  		Logger:      lg,
   340  	}
   341  	if tls != nil {
   342  		clientTLS, err := tls.ClientConfig()
   343  		if err != nil {
   344  			return nil, err
   345  		}
   346  		cfg.TLS = clientTLS
   347  	}
   348  	return &cfg, nil
   349  }
   350  
   351  func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
   352  	// set tls if any one tls option set
   353  	cfg := clientv3.Config{
   354  		Endpoints:        eps,
   355  		AutoSyncInterval: grpcProxyEndpointsAutoSyncInterval,
   356  		DialTimeout:      5 * time.Second,
   357  	}
   358  
   359  	if grpcMaxCallSendMsgSize > 0 {
   360  		cfg.MaxCallSendMsgSize = grpcMaxCallSendMsgSize
   361  	}
   362  	if grpcMaxCallRecvMsgSize > 0 {
   363  		cfg.MaxCallRecvMsgSize = grpcMaxCallRecvMsgSize
   364  	}
   365  	if grpcProxyDialKeepAliveTime > 0 {
   366  		cfg.DialKeepAliveTime = grpcProxyDialKeepAliveTime
   367  	}
   368  	if grpcProxyDialKeepAliveTimeout > 0 {
   369  		cfg.DialKeepAliveTimeout = grpcProxyDialKeepAliveTimeout
   370  	}
   371  	cfg.PermitWithoutStream = grpcProxyPermitWithoutStream
   372  
   373  	tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey, true)
   374  	if tls == nil && grpcProxyInsecureSkipTLSVerify {
   375  		tls = &transport.TLSInfo{}
   376  	}
   377  	if tls != nil {
   378  		clientTLS, err := tls.ClientConfig()
   379  		if err != nil {
   380  			return nil, err
   381  		}
   382  		clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
   383  		if clientTLS.InsecureSkipVerify {
   384  			lg.Warn("--insecure-skip-tls-verify was given, this grpc proxy process skips authentication of etcd server TLS certificates. This option should be enabled only for testing purposes.")
   385  		}
   386  		cfg.TLS = clientTLS
   387  		lg.Info("gRPC proxy client TLS", zap.String("tls-info", fmt.Sprintf("%+v", tls)))
   388  	}
   389  	return &cfg, nil
   390  }
   391  
   392  func newTLS(ca, cert, key string, requireEmptyCN bool) *transport.TLSInfo {
   393  	if ca == "" && cert == "" && key == "" {
   394  		return nil
   395  	}
   396  	return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key, EmptyCN: requireEmptyCN}
   397  }
   398  
   399  func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
   400  	l, err := net.Listen("tcp", grpcProxyListenAddr)
   401  	if err != nil {
   402  		fmt.Fprintln(os.Stderr, err)
   403  		os.Exit(1)
   404  	}
   405  
   406  	if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
   407  		fmt.Fprintln(os.Stderr, err)
   408  		os.Exit(1)
   409  	}
   410  	if tlsinfo != nil {
   411  		tlsinfo.CRLFile = grpcProxyListenCRL
   412  		if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
   413  			lg.Fatal("failed to create TLS listener", zap.Error(err))
   414  		}
   415  	}
   416  
   417  	lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
   418  	return cmux.New(l)
   419  }
   420  
   421  func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
   422  	if grpcProxyEnableOrdering {
   423  		vf := ordering.NewOrderViolationSwitchEndpointClosure(client)
   424  		client.KV = ordering.NewKV(client.KV, vf)
   425  		lg.Info("waiting for linearized read from cluster to recover ordering")
   426  		for {
   427  			_, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
   428  			if err == nil {
   429  				break
   430  			}
   431  			lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
   432  			time.Sleep(time.Second)
   433  		}
   434  	}
   435  
   436  	if len(grpcProxyNamespace) > 0 {
   437  		client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
   438  		client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
   439  		client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
   440  	}
   441  
   442  	if len(grpcProxyLeasing) > 0 {
   443  		client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
   444  	}
   445  
   446  	kvp, _ := grpcproxy.NewKvProxy(client)
   447  	watchp, _ := grpcproxy.NewWatchProxy(client.Ctx(), lg, client)
   448  	if grpcProxyResolverPrefix != "" {
   449  		grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
   450  	}
   451  	clusterp, _ := grpcproxy.NewClusterProxy(lg, client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
   452  	leasep, _ := grpcproxy.NewLeaseProxy(client.Ctx(), client)
   453  
   454  	mainp := grpcproxy.NewMaintenanceProxy(client)
   455  	authp := grpcproxy.NewAuthProxy(client)
   456  	electionp := grpcproxy.NewElectionProxy(client)
   457  	lockp := grpcproxy.NewLockProxy(client)
   458  
   459  	gopts := []grpc.ServerOption{
   460  		grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
   461  		grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
   462  		grpc.MaxConcurrentStreams(math.MaxUint32),
   463  	}
   464  	if grpcKeepAliveMinTime > time.Duration(0) {
   465  		gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
   466  			MinTime:             grpcKeepAliveMinTime,
   467  			PermitWithoutStream: false,
   468  		}))
   469  	}
   470  	if grpcKeepAliveInterval > time.Duration(0) ||
   471  		grpcKeepAliveTimeout > time.Duration(0) {
   472  		gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
   473  			Time:    grpcKeepAliveInterval,
   474  			Timeout: grpcKeepAliveTimeout,
   475  		}))
   476  	}
   477  
   478  	server := grpc.NewServer(gopts...)
   479  
   480  	pb.RegisterKVServer(server, kvp)
   481  	pb.RegisterWatchServer(server, watchp)
   482  	pb.RegisterClusterServer(server, clusterp)
   483  	pb.RegisterLeaseServer(server, leasep)
   484  	pb.RegisterMaintenanceServer(server, mainp)
   485  	pb.RegisterAuthServer(server, authp)
   486  	v3electionpb.RegisterElectionServer(server, electionp)
   487  	v3lockpb.RegisterLockServer(server, lockp)
   488  
   489  	return server
   490  }
   491  
   492  func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) {
   493  	httpClient := mustNewHTTPClient(lg)
   494  	httpmux := http.NewServeMux()
   495  	httpmux.HandleFunc("/", http.NotFound)
   496  	grpcproxy.HandleMetrics(httpmux, httpClient, c.Endpoints())
   497  	grpcproxy.HandleHealth(lg, httpmux, c)
   498  	grpcproxy.HandleProxyMetrics(httpmux)
   499  	grpcproxy.HandleProxyHealth(lg, httpmux, proxy)
   500  	if grpcProxyEnablePprof {
   501  		for p, h := range debugutil.PProfHandlers() {
   502  			httpmux.Handle(p, h)
   503  		}
   504  		lg.Info("gRPC proxy enabled pprof", zap.String("path", debugutil.HTTPPrefixPProf))
   505  	}
   506  	srvhttp := &http.Server{
   507  		Handler:  httpmux,
   508  		ErrorLog: log.New(ioutil.Discard, "net/http", 0),
   509  	}
   510  
   511  	if tlsinfo == nil {
   512  		return srvhttp, m.Match(cmux.HTTP1())
   513  	}
   514  
   515  	srvTLS, err := tlsinfo.ServerConfig()
   516  	if err != nil {
   517  		lg.Fatal("failed to set up TLS", zap.Error(err))
   518  	}
   519  	srvhttp.TLSConfig = srvTLS
   520  	return srvhttp, m.Match(cmux.Any())
   521  }
   522  
   523  func mustNewHTTPClient(lg *zap.Logger) *http.Client {
   524  	transport, err := newHTTPTransport(grpcProxyCA, grpcProxyCert, grpcProxyKey)
   525  	if err != nil {
   526  		fmt.Fprintln(os.Stderr, err)
   527  		os.Exit(1)
   528  	}
   529  	return &http.Client{Transport: transport}
   530  }
   531  
   532  func newHTTPTransport(ca, cert, key string) (*http.Transport, error) {
   533  	tr := &http.Transport{}
   534  
   535  	if ca != "" && cert != "" && key != "" {
   536  		caCert, err := ioutil.ReadFile(ca)
   537  		if err != nil {
   538  			return nil, err
   539  		}
   540  		keyPair, err := tls.LoadX509KeyPair(cert, key)
   541  		if err != nil {
   542  			return nil, err
   543  		}
   544  		caPool := x509.NewCertPool()
   545  		caPool.AppendCertsFromPEM(caCert)
   546  
   547  		tlsConfig := &tls.Config{
   548  			Certificates: []tls.Certificate{keyPair},
   549  			RootCAs:      caPool,
   550  		}
   551  		tlsConfig.BuildNameToCertificate()
   552  		tr.TLSClientConfig = tlsConfig
   553  	} else if grpcProxyInsecureSkipTLSVerify {
   554  		tlsConfig := &tls.Config{InsecureSkipVerify: grpcProxyInsecureSkipTLSVerify}
   555  		tr.TLSClientConfig = tlsConfig
   556  	}
   557  	return tr, nil
   558  }
   559  
   560  func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener {
   561  	murl, err := url.Parse(grpcProxyMetricsListenAddr)
   562  	if err != nil {
   563  		fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
   564  		os.Exit(1)
   565  	}
   566  	ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
   567  	if err != nil {
   568  		fmt.Fprintln(os.Stderr, err)
   569  		os.Exit(1)
   570  	}
   571  	lg.Info("gRPC proxy listening for metrics", zap.String("address", murl.String()))
   572  	return ml
   573  }
   574  

View as plain text