1 package grpc_prometheus
2
3 import (
4 "io"
5
6 prom "github.com/prometheus/client_golang/prometheus"
7 "golang.org/x/net/context"
8 "google.golang.org/grpc"
9 "google.golang.org/grpc/codes"
10 "google.golang.org/grpc/status"
11 )
12
13
14
15 type ClientMetrics struct {
16 clientStartedCounter *prom.CounterVec
17 clientHandledCounter *prom.CounterVec
18 clientStreamMsgReceived *prom.CounterVec
19 clientStreamMsgSent *prom.CounterVec
20 clientHandledHistogramEnabled bool
21 clientHandledHistogramOpts prom.HistogramOpts
22 clientHandledHistogram *prom.HistogramVec
23 }
24
25
26
27
28
29 func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
30 opts := counterOptions(counterOpts)
31 return &ClientMetrics{
32 clientStartedCounter: prom.NewCounterVec(
33 opts.apply(prom.CounterOpts{
34 Name: "grpc_client_started_total",
35 Help: "Total number of RPCs started on the client.",
36 }), []string{"grpc_type", "grpc_service", "grpc_method"}),
37
38 clientHandledCounter: prom.NewCounterVec(
39 opts.apply(prom.CounterOpts{
40 Name: "grpc_client_handled_total",
41 Help: "Total number of RPCs completed by the client, regardless of success or failure.",
42 }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
43
44 clientStreamMsgReceived: prom.NewCounterVec(
45 opts.apply(prom.CounterOpts{
46 Name: "grpc_client_msg_received_total",
47 Help: "Total number of RPC stream messages received by the client.",
48 }), []string{"grpc_type", "grpc_service", "grpc_method"}),
49
50 clientStreamMsgSent: prom.NewCounterVec(
51 opts.apply(prom.CounterOpts{
52 Name: "grpc_client_msg_sent_total",
53 Help: "Total number of gRPC stream messages sent by the client.",
54 }), []string{"grpc_type", "grpc_service", "grpc_method"}),
55
56 clientHandledHistogramEnabled: false,
57 clientHandledHistogramOpts: prom.HistogramOpts{
58 Name: "grpc_client_handling_seconds",
59 Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
60 Buckets: prom.DefBuckets,
61 },
62 clientHandledHistogram: nil,
63 }
64 }
65
66
67
68
69 func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
70 m.clientStartedCounter.Describe(ch)
71 m.clientHandledCounter.Describe(ch)
72 m.clientStreamMsgReceived.Describe(ch)
73 m.clientStreamMsgSent.Describe(ch)
74 if m.clientHandledHistogramEnabled {
75 m.clientHandledHistogram.Describe(ch)
76 }
77 }
78
79
80
81
82 func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
83 m.clientStartedCounter.Collect(ch)
84 m.clientHandledCounter.Collect(ch)
85 m.clientStreamMsgReceived.Collect(ch)
86 m.clientStreamMsgSent.Collect(ch)
87 if m.clientHandledHistogramEnabled {
88 m.clientHandledHistogram.Collect(ch)
89 }
90 }
91
92
93
94 func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
95 for _, o := range opts {
96 o(&m.clientHandledHistogramOpts)
97 }
98 if !m.clientHandledHistogramEnabled {
99 m.clientHandledHistogram = prom.NewHistogramVec(
100 m.clientHandledHistogramOpts,
101 []string{"grpc_type", "grpc_service", "grpc_method"},
102 )
103 }
104 m.clientHandledHistogramEnabled = true
105 }
106
107
108 func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
109 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
110 monitor := newClientReporter(m, Unary, method)
111 monitor.SentMessage()
112 err := invoker(ctx, method, req, reply, cc, opts...)
113 if err != nil {
114 monitor.ReceivedMessage()
115 }
116 st, _ := status.FromError(err)
117 monitor.Handled(st.Code())
118 return err
119 }
120 }
121
122
123 func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
124 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
125 monitor := newClientReporter(m, clientStreamType(desc), method)
126 clientStream, err := streamer(ctx, desc, cc, method, opts...)
127 if err != nil {
128 st, _ := status.FromError(err)
129 monitor.Handled(st.Code())
130 return nil, err
131 }
132 return &monitoredClientStream{clientStream, monitor}, nil
133 }
134 }
135
136 func clientStreamType(desc *grpc.StreamDesc) grpcType {
137 if desc.ClientStreams && !desc.ServerStreams {
138 return ClientStream
139 } else if !desc.ClientStreams && desc.ServerStreams {
140 return ServerStream
141 }
142 return BidiStream
143 }
144
145
146 type monitoredClientStream struct {
147 grpc.ClientStream
148 monitor *clientReporter
149 }
150
151 func (s *monitoredClientStream) SendMsg(m interface{}) error {
152 err := s.ClientStream.SendMsg(m)
153 if err == nil {
154 s.monitor.SentMessage()
155 }
156 return err
157 }
158
159 func (s *monitoredClientStream) RecvMsg(m interface{}) error {
160 err := s.ClientStream.RecvMsg(m)
161 if err == nil {
162 s.monitor.ReceivedMessage()
163 } else if err == io.EOF {
164 s.monitor.Handled(codes.OK)
165 } else {
166 st, _ := status.FromError(err)
167 s.monitor.Handled(st.Code())
168 }
169 return err
170 }
171
View as plain text