1
2
3
4 package grpc_prometheus
5
6 import (
7 "bufio"
8 "io"
9 "net"
10 "net/http"
11 "net/http/httptest"
12 "strconv"
13 "strings"
14 "testing"
15 "time"
16
17 pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto"
18 "github.com/prometheus/client_golang/prometheus"
19 "github.com/stretchr/testify/assert"
20 "github.com/stretchr/testify/require"
21 "github.com/stretchr/testify/suite"
22 "golang.org/x/net/context"
23 "google.golang.org/grpc"
24 "google.golang.org/grpc/codes"
25 "google.golang.org/grpc/status"
26 )
27
28 var (
29
30 _ prometheus.Collector = NewServerMetrics()
31 )
32
33 const (
34 pingDefaultValue = "I like kittens."
35 countListResponses = 20
36 )
37
38 func TestServerInterceptorSuite(t *testing.T) {
39 suite.Run(t, &ServerInterceptorTestSuite{})
40 }
41
42 type ServerInterceptorTestSuite struct {
43 suite.Suite
44
45 serverListener net.Listener
46 server *grpc.Server
47 clientConn *grpc.ClientConn
48 testClient pb_testproto.TestServiceClient
49 ctx context.Context
50 }
51
52 func (s *ServerInterceptorTestSuite) SetupSuite() {
53 var err error
54
55 EnableHandlingTimeHistogram()
56
57 s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
58 require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
59
60
61 s.server = grpc.NewServer(
62 grpc.StreamInterceptor(StreamServerInterceptor),
63 grpc.UnaryInterceptor(UnaryServerInterceptor),
64 )
65 pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()})
66
67 go func() {
68 s.server.Serve(s.serverListener)
69 }()
70
71 s.clientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second))
72 require.NoError(s.T(), err, "must not error on client Dial")
73 s.testClient = pb_testproto.NewTestServiceClient(s.clientConn)
74
75
76 Register(s.server)
77 }
78
79 func (s *ServerInterceptorTestSuite) SetupTest() {
80
81 s.ctx, _ = context.WithTimeout(context.TODO(), 2*time.Second)
82 }
83
84 func (s *ServerInterceptorTestSuite) TearDownSuite() {
85 if s.serverListener != nil {
86 s.server.Stop()
87 s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String())
88 s.serverListener.Close()
89
90 }
91 if s.clientConn != nil {
92 s.clientConn.Close()
93 }
94 }
95
96 func (s *ServerInterceptorTestSuite) TestRegisterPresetsStuff() {
97 for testID, testCase := range []struct {
98 metricName string
99 existingLabels []string
100 }{
101 {"grpc_server_started_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary"}},
102 {"grpc_server_started_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream"}},
103 {"grpc_server_msg_received_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream"}},
104 {"grpc_server_msg_sent_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary"}},
105 {"grpc_server_handling_seconds_sum", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary"}},
106 {"grpc_server_handling_seconds_count", []string{"mwitkow.testproto.TestService", "PingList", "server_stream"}},
107 {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream", "OutOfRange"}},
108 {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream", "Aborted"}},
109 {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary", "FailedPrecondition"}},
110 {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary", "ResourceExhausted"}},
111 } {
112 lineCount := len(fetchPrometheusLines(s.T(), testCase.metricName, testCase.existingLabels...))
113 assert.NotEqual(s.T(), 0, lineCount, "metrics must exist for test case %d", testID)
114 }
115 }
116
117 func (s *ServerInterceptorTestSuite) TestUnaryIncrementsStarted() {
118 var before int
119 var after int
120
121 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingEmpty", "unary")
122 s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{})
123 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingEmpty", "unary")
124 assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingEmpty")
125
126 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingError", "unary")
127 s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.Unavailable)})
128 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingError", "unary")
129 assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingError")
130 }
131
132 func (s *ServerInterceptorTestSuite) TestUnaryIncrementsHandled() {
133 var before int
134 var after int
135
136 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingEmpty", "unary", "OK")
137 s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{})
138 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingEmpty", "unary", "OK")
139 assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_count should be incremented for PingEmpty")
140
141 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingError", "unary", "FailedPrecondition")
142 s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)})
143 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingError", "unary", "FailedPrecondition")
144 assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingError")
145 }
146
147 func (s *ServerInterceptorTestSuite) TestUnaryIncrementsHistograms() {
148 var before int
149 var after int
150
151 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingEmpty", "unary")
152 s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{})
153 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingEmpty", "unary")
154 assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_count should be incremented for PingEmpty")
155
156 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingError", "unary")
157 s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)})
158 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingError", "unary")
159 assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingError")
160 }
161
162 func (s *ServerInterceptorTestSuite) TestStreamingIncrementsStarted() {
163 var before int
164 var after int
165
166 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingList", "server_stream")
167 s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{})
168 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingList", "server_stream")
169 assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingList")
170 }
171
172 func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHistograms() {
173 var before int
174 var after int
175
176 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
177 ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{})
178
179 for {
180 _, err := ss.Recv()
181 if err == io.EOF {
182 break
183 }
184 require.NoError(s.T(), err, "reading pingList shouldn't fail")
185 }
186 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
187 assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingList OK")
188
189 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
190 _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)})
191 require.NoError(s.T(), err, "PingList must not fail immediately")
192
193 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
194 assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingList FailedPrecondition")
195 }
196
197 func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHandled() {
198 var before int
199 var after int
200
201 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "OK")
202 ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{})
203
204 for {
205 _, err := ss.Recv()
206 if err == io.EOF {
207 break
208 }
209 require.NoError(s.T(), err, "reading pingList shouldn't fail")
210 }
211 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "OK")
212 assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingList OK")
213
214 before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition")
215 _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)})
216 require.NoError(s.T(), err, "PingList must not fail immediately")
217
218 after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition")
219 assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingList FailedPrecondition")
220 }
221
222 func (s *ServerInterceptorTestSuite) TestStreamingIncrementsMessageCounts() {
223 beforeRecv := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_received_total", "PingList", "server_stream")
224 beforeSent := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_sent_total", "PingList", "server_stream")
225 ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{})
226
227 count := 0
228 for {
229 _, err := ss.Recv()
230 if err == io.EOF {
231 break
232 }
233 require.NoError(s.T(), err, "reading pingList shouldn't fail")
234 count++
235 }
236 require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match")
237 afterSent := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_sent_total", "PingList", "server_stream")
238 afterRecv := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_received_total", "PingList", "server_stream")
239
240 assert.EqualValues(s.T(), beforeSent+countListResponses, afterSent, "grpc_server_msg_sent_total should be incremented 20 times for PingList")
241 assert.EqualValues(s.T(), beforeRecv+1, afterRecv, "grpc_server_msg_sent_total should be incremented ones for PingList ")
242 }
243
244 func fetchPrometheusLines(t *testing.T, metricName string, matchingLabelValues ...string) []string {
245 resp := httptest.NewRecorder()
246 req, err := http.NewRequest("GET", "/", nil)
247 require.NoError(t, err, "failed creating request for Prometheus handler")
248 prometheus.Handler().ServeHTTP(resp, req)
249 reader := bufio.NewReader(resp.Body)
250 ret := []string{}
251 for {
252 line, err := reader.ReadString('\n')
253 if err == io.EOF {
254 break
255 } else {
256 require.NoError(t, err, "error reading stuff")
257 }
258 if !strings.HasPrefix(line, metricName) {
259 continue
260 }
261 matches := true
262 for _, labelValue := range matchingLabelValues {
263 if !strings.Contains(line, `"`+labelValue+`"`) {
264 matches = false
265 }
266 }
267 if matches {
268 ret = append(ret, line)
269 }
270
271 }
272 return ret
273 }
274
275 func sumCountersForMetricAndLabels(t *testing.T, metricName string, matchingLabelValues ...string) int {
276 count := 0
277 for _, line := range fetchPrometheusLines(t, metricName, matchingLabelValues...) {
278 valueString := line[strings.LastIndex(line, " ")+1 : len(line)-1]
279 valueFloat, err := strconv.ParseFloat(valueString, 32)
280 require.NoError(t, err, "failed parsing value for line: %v", line)
281 count += int(valueFloat)
282 }
283 return count
284 }
285
286 type testService struct {
287 t *testing.T
288 }
289
290 func (s *testService) PingEmpty(ctx context.Context, _ *pb_testproto.Empty) (*pb_testproto.PingResponse, error) {
291 return &pb_testproto.PingResponse{Value: pingDefaultValue, Counter: 42}, nil
292 }
293
294 func (s *testService) Ping(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) {
295
296 return &pb_testproto.PingResponse{Value: ping.Value, Counter: 42}, nil
297 }
298
299 func (s *testService) PingError(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.Empty, error) {
300 code := codes.Code(ping.ErrorCodeReturned)
301 return nil, status.Errorf(code, "Userspace error.")
302 }
303
304 func (s *testService) PingList(ping *pb_testproto.PingRequest, stream pb_testproto.TestService_PingListServer) error {
305 if ping.ErrorCodeReturned != 0 {
306 return status.Errorf(codes.Code(ping.ErrorCodeReturned), "foobar")
307 }
308
309 for i := 0; i < countListResponses; i++ {
310 stream.Send(&pb_testproto.PingResponse{Value: ping.Value, Counter: int32(i)})
311 }
312 return nil
313 }
314
View as plain text