...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3rpc
16
17 import (
18 "crypto/tls"
19 "math"
20
21 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22 "go.etcd.io/etcd/client/v3/credentials"
23 "go.etcd.io/etcd/server/v3/etcdserver"
24
25 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
26 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
27 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/health"
30 healthpb "google.golang.org/grpc/health/grpc_health_v1"
31 )
32
33 const (
34 grpcOverheadBytes = 512 * 1024
35 maxSendBytes = math.MaxInt32
36 )
37
38 func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
39 var opts []grpc.ServerOption
40 opts = append(opts, grpc.CustomCodec(&codec{}))
41 if tls != nil {
42 bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
43 opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
44 }
45 chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
46 newLogUnaryInterceptor(s),
47 newUnaryInterceptor(s),
48 grpc_prometheus.UnaryServerInterceptor,
49 }
50 if interceptor != nil {
51 chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor)
52 }
53
54 chainStreamInterceptors := []grpc.StreamServerInterceptor{
55 newStreamInterceptor(s),
56 grpc_prometheus.StreamServerInterceptor,
57 }
58
59 if s.Cfg.ExperimentalEnableDistributedTracing {
60 chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
61 chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
62
63 }
64
65 opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(chainUnaryInterceptors...)))
66 opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(chainStreamInterceptors...)))
67
68 opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
69 opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
70 opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))
71
72 grpcServer := grpc.NewServer(append(opts, gopts...)...)
73
74 pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
75 pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
76 pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
77 pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
78 pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
79 pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
80
81
82
83
84 hsrv := health.NewServer()
85 hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
86 healthpb.RegisterHealthServer(grpcServer, hsrv)
87
88
89 grpc_prometheus.Register(grpcServer)
90
91 return grpcServer
92 }
93
View as plain text