1 package grpc_prometheus
2
3 import (
4 prom "github.com/prometheus/client_golang/prometheus"
5 "golang.org/x/net/context"
6 "google.golang.org/grpc"
7 "google.golang.org/grpc/status"
8 )
9
10
11
12 type ServerMetrics struct {
13 serverStartedCounter *prom.CounterVec
14 serverHandledCounter *prom.CounterVec
15 serverStreamMsgReceived *prom.CounterVec
16 serverStreamMsgSent *prom.CounterVec
17 serverHandledHistogramEnabled bool
18 serverHandledHistogramOpts prom.HistogramOpts
19 serverHandledHistogram *prom.HistogramVec
20 }
21
22
23
24
25
26 func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
27 opts := counterOptions(counterOpts)
28 return &ServerMetrics{
29 serverStartedCounter: prom.NewCounterVec(
30 opts.apply(prom.CounterOpts{
31 Name: "grpc_server_started_total",
32 Help: "Total number of RPCs started on the server.",
33 }), []string{"grpc_type", "grpc_service", "grpc_method"}),
34 serverHandledCounter: prom.NewCounterVec(
35 opts.apply(prom.CounterOpts{
36 Name: "grpc_server_handled_total",
37 Help: "Total number of RPCs completed on the server, regardless of success or failure.",
38 }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
39 serverStreamMsgReceived: prom.NewCounterVec(
40 opts.apply(prom.CounterOpts{
41 Name: "grpc_server_msg_received_total",
42 Help: "Total number of RPC stream messages received on the server.",
43 }), []string{"grpc_type", "grpc_service", "grpc_method"}),
44 serverStreamMsgSent: prom.NewCounterVec(
45 opts.apply(prom.CounterOpts{
46 Name: "grpc_server_msg_sent_total",
47 Help: "Total number of gRPC stream messages sent by the server.",
48 }), []string{"grpc_type", "grpc_service", "grpc_method"}),
49 serverHandledHistogramEnabled: false,
50 serverHandledHistogramOpts: prom.HistogramOpts{
51 Name: "grpc_server_handling_seconds",
52 Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
53 Buckets: prom.DefBuckets,
54 },
55 serverHandledHistogram: nil,
56 }
57 }
58
59
60
61
62
63 func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
64 for _, o := range opts {
65 o(&m.serverHandledHistogramOpts)
66 }
67 if !m.serverHandledHistogramEnabled {
68 m.serverHandledHistogram = prom.NewHistogramVec(
69 m.serverHandledHistogramOpts,
70 []string{"grpc_type", "grpc_service", "grpc_method"},
71 )
72 }
73 m.serverHandledHistogramEnabled = true
74 }
75
76
77
78
79 func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
80 m.serverStartedCounter.Describe(ch)
81 m.serverHandledCounter.Describe(ch)
82 m.serverStreamMsgReceived.Describe(ch)
83 m.serverStreamMsgSent.Describe(ch)
84 if m.serverHandledHistogramEnabled {
85 m.serverHandledHistogram.Describe(ch)
86 }
87 }
88
89
90
91
92 func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
93 m.serverStartedCounter.Collect(ch)
94 m.serverHandledCounter.Collect(ch)
95 m.serverStreamMsgReceived.Collect(ch)
96 m.serverStreamMsgSent.Collect(ch)
97 if m.serverHandledHistogramEnabled {
98 m.serverHandledHistogram.Collect(ch)
99 }
100 }
101
102
103 func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
104 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
105 monitor := newServerReporter(m, Unary, info.FullMethod)
106 monitor.ReceivedMessage()
107 resp, err := handler(ctx, req)
108 st, _ := status.FromError(err)
109 monitor.Handled(st.Code())
110 if err == nil {
111 monitor.SentMessage()
112 }
113 return resp, err
114 }
115 }
116
117
118 func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
119 return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
120 monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
121 err := handler(srv, &monitoredServerStream{ss, monitor})
122 st, _ := status.FromError(err)
123 monitor.Handled(st.Code())
124 return err
125 }
126 }
127
128
129
130
131 func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
132 serviceInfo := server.GetServiceInfo()
133 for serviceName, info := range serviceInfo {
134 for _, mInfo := range info.Methods {
135 preRegisterMethod(m, serviceName, &mInfo)
136 }
137 }
138 }
139
140 func streamRPCType(info *grpc.StreamServerInfo) grpcType {
141 if info.IsClientStream && !info.IsServerStream {
142 return ClientStream
143 } else if !info.IsClientStream && info.IsServerStream {
144 return ServerStream
145 }
146 return BidiStream
147 }
148
149
150 type monitoredServerStream struct {
151 grpc.ServerStream
152 monitor *serverReporter
153 }
154
155 func (s *monitoredServerStream) SendMsg(m interface{}) error {
156 err := s.ServerStream.SendMsg(m)
157 if err == nil {
158 s.monitor.SentMessage()
159 }
160 return err
161 }
162
163 func (s *monitoredServerStream) RecvMsg(m interface{}) error {
164 err := s.ServerStream.RecvMsg(m)
165 if err == nil {
166 s.monitor.ReceivedMessage()
167 }
168 return err
169 }
170
171
172 func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
173 methodName := mInfo.Name
174 methodType := string(typeFromMethodInfo(mInfo))
175
176 metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
177 metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
178 metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
179 if metrics.serverHandledHistogramEnabled {
180 metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
181 }
182 for _, code := range allCodes {
183 metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
184 }
185 }
186
View as plain text