1
2
3
4 package grpc_retry_test
5
6 import (
7 "context"
8 "io"
9 "sync"
10 "testing"
11 "time"
12
13 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
14 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
15 "github.com/grpc-ecosystem/go-grpc-middleware/testing"
16 pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
17 "github.com/stretchr/testify/assert"
18 "github.com/stretchr/testify/require"
19 "github.com/stretchr/testify/suite"
20 "google.golang.org/grpc"
21 "google.golang.org/grpc/codes"
22 "google.golang.org/grpc/status"
23 )
24
25 var (
26 retriableErrors = []codes.Code{codes.Unavailable, codes.DataLoss}
27 goodPing = &pb_testproto.PingRequest{Value: "something"}
28 noSleep = 0 * time.Second
29 retryTimeout = 50 * time.Millisecond
30 )
31
32 type failingService struct {
33 pb_testproto.TestServiceServer
34 mu sync.Mutex
35
36 reqCounter uint
37 reqModulo uint
38 reqSleep time.Duration
39 reqError codes.Code
40 }
41
42 func (s *failingService) resetFailingConfiguration(modulo uint, errorCode codes.Code, sleepTime time.Duration) {
43 s.mu.Lock()
44 defer s.mu.Unlock()
45
46 s.reqCounter = 0
47 s.reqModulo = modulo
48 s.reqError = errorCode
49 s.reqSleep = sleepTime
50 }
51
52 func (s *failingService) requestCount() uint {
53 s.mu.Lock()
54 defer s.mu.Unlock()
55 return s.reqCounter
56 }
57
58 func (s *failingService) maybeFailRequest() error {
59 s.mu.Lock()
60 s.reqCounter += 1
61 reqModulo := s.reqModulo
62 reqCounter := s.reqCounter
63 reqSleep := s.reqSleep
64 reqError := s.reqError
65 s.mu.Unlock()
66 if (reqModulo > 0) && (reqCounter%reqModulo == 0) {
67 return nil
68 }
69 time.Sleep(reqSleep)
70 return status.Errorf(reqError, "maybeFailRequest: failing it")
71 }
72
73 func (s *failingService) Ping(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) {
74 if err := s.maybeFailRequest(); err != nil {
75 return nil, err
76 }
77 return s.TestServiceServer.Ping(ctx, ping)
78 }
79
80 func (s *failingService) PingList(ping *pb_testproto.PingRequest, stream pb_testproto.TestService_PingListServer) error {
81 if err := s.maybeFailRequest(); err != nil {
82 return err
83 }
84 return s.TestServiceServer.PingList(ping, stream)
85 }
86
87 func (s *failingService) PingStream(stream pb_testproto.TestService_PingStreamServer) error {
88 if err := s.maybeFailRequest(); err != nil {
89 return err
90 }
91 return s.TestServiceServer.PingStream(stream)
92 }
93
94 func TestRetrySuite(t *testing.T) {
95 service := &failingService{
96 TestServiceServer: &grpc_testing.TestPingService{T: t},
97 }
98 unaryInterceptor := grpc_retry.UnaryClientInterceptor(
99 grpc_retry.WithCodes(retriableErrors...),
100 grpc_retry.WithMax(3),
101 grpc_retry.WithBackoff(grpc_retry.BackoffLinear(retryTimeout)),
102 )
103 streamInterceptor := grpc_retry.StreamClientInterceptor(
104 grpc_retry.WithCodes(retriableErrors...),
105 grpc_retry.WithMax(3),
106 grpc_retry.WithBackoff(grpc_retry.BackoffLinear(retryTimeout)),
107 )
108 s := &RetrySuite{
109 srv: service,
110 InterceptorTestSuite: &grpc_testing.InterceptorTestSuite{
111 TestService: service,
112 ClientOpts: []grpc.DialOption{
113 grpc.WithStreamInterceptor(streamInterceptor),
114 grpc.WithUnaryInterceptor(unaryInterceptor),
115 },
116 },
117 }
118 suite.Run(t, s)
119 }
120
121 type RetrySuite struct {
122 *grpc_testing.InterceptorTestSuite
123 srv *failingService
124 }
125
126 func (s *RetrySuite) SetupTest() {
127 s.srv.resetFailingConfiguration( 0, codes.OK, noSleep)
128 }
129
130 func (s *RetrySuite) TestUnary_FailsOnNonRetriableError() {
131 s.srv.resetFailingConfiguration(5, codes.Internal, noSleep)
132 _, err := s.Client.Ping(s.SimpleCtx(), goodPing)
133 require.Error(s.T(), err, "error must occur from the failing service")
134 require.Equal(s.T(), codes.Internal, status.Code(err), "failure code must come from retrier")
135 require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
136 }
137
138 func (s *RetrySuite) TestUnary_FailsOnNonRetriableContextError() {
139 s.srv.resetFailingConfiguration(5, codes.Canceled, noSleep)
140 _, err := s.Client.Ping(s.SimpleCtx(), goodPing)
141 require.Error(s.T(), err, "error must occur from the failing service")
142 require.Equal(s.T(), codes.Canceled, status.Code(err), "failure code must come from retrier")
143 require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
144 }
145
146 func (s *RetrySuite) TestCallOptionsDontPanicWithoutInterceptor() {
147
148
149 s.srv.resetFailingConfiguration(100, codes.DataLoss, noSleep)
150 nonMiddlewareClient := s.NewClient()
151 _, err := nonMiddlewareClient.Ping(s.SimpleCtx(), goodPing,
152 grpc_retry.WithMax(5),
153 grpc_retry.WithBackoff(grpc_retry.BackoffLinear(1*time.Millisecond)),
154 grpc_retry.WithCodes(codes.DataLoss),
155 grpc_retry.WithPerRetryTimeout(1*time.Millisecond),
156 )
157 require.Error(s.T(), err)
158 }
159
160 func (s *RetrySuite) TestServerStream_FailsOnNonRetriableError() {
161 s.srv.resetFailingConfiguration(5, codes.Internal, noSleep)
162 stream, err := s.Client.PingList(s.SimpleCtx(), goodPing)
163 require.NoError(s.T(), err, "should not fail on establishing the stream")
164 _, err = stream.Recv()
165 require.Error(s.T(), err, "error must occur from the failing service")
166 require.Equal(s.T(), codes.Internal, status.Code(err), "failure code must come from retrier")
167 }
168
169 func (s *RetrySuite) TestUnary_SucceedsOnRetriableError() {
170 s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep)
171 out, err := s.Client.Ping(s.SimpleCtx(), goodPing)
172 require.NoError(s.T(), err, "the third invocation should succeed")
173 require.NotNil(s.T(), out, "Pong must be not nil")
174 require.EqualValues(s.T(), 3, s.srv.requestCount(), "three requests should have been made")
175 }
176
177 func (s *RetrySuite) TestUnary_OverrideFromDialOpts() {
178 s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep)
179 out, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithCodes(codes.ResourceExhausted), grpc_retry.WithMax(5))
180 require.NoError(s.T(), err, "the fifth invocation should succeed")
181 require.NotNil(s.T(), out, "Pong must be not nil")
182 require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
183 }
184
185 func (s *RetrySuite) TestUnary_PerCallDeadline_Succeeds() {
186
187
188 deadlinePerCall := 5 * time.Millisecond
189 s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
190 out, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
191 grpc_retry.WithMax(5))
192 require.NoError(s.T(), err, "the fifth invocation should succeed")
193 require.NotNil(s.T(), out, "Pong must be not nil")
194 require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
195 }
196
197 func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() {
198
199
200
201
202
203 parentDeadline := 150 * time.Millisecond
204 deadlinePerCall := 50 * time.Millisecond
205
206 s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
207 ctx, _ := context.WithTimeout(context.TODO(), parentDeadline)
208 _, err := s.Client.Ping(ctx, goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
209 grpc_retry.WithMax(5))
210 require.Error(s.T(), err, "the retries must fail due to context deadline exceeded")
211 require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
212 }
213
214 func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() {
215 s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep)
216 stream, err := s.Client.PingList(s.SimpleCtx(), goodPing)
217 require.NoError(s.T(), err, "establishing the connection must always succeed")
218 s.assertPingListWasCorrect(stream)
219 require.EqualValues(s.T(), 3, s.srv.requestCount(), "three requests should have been made")
220 }
221
222 func (s *RetrySuite) TestServerStream_OverrideFromContext() {
223 s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep)
224 stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithCodes(codes.ResourceExhausted), grpc_retry.WithMax(5))
225 require.NoError(s.T(), err, "establishing the connection must always succeed")
226 s.assertPingListWasCorrect(stream)
227 require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
228 }
229
230 func (s *RetrySuite) TestServerStream_PerCallDeadline_Succeeds() {
231
232
233 deadlinePerCall := 50 * time.Millisecond
234 s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
235 stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
236 grpc_retry.WithMax(5))
237 require.NoError(s.T(), err, "establishing the connection must always succeed")
238 s.assertPingListWasCorrect(stream)
239 require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
240 }
241
242 func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
243
244
245
246
247
248 parentDeadline := 150 * time.Millisecond
249 deadlinePerCall := 50 * time.Millisecond
250
251 s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
252 parentCtx, _ := context.WithTimeout(context.TODO(), parentDeadline)
253 stream, err := s.Client.PingList(parentCtx, goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
254 grpc_retry.WithMax(5))
255 require.NoError(s.T(), err, "establishing the connection must always succeed")
256 _, err = stream.Recv()
257 require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
258 }
259
260 func (s *RetrySuite) TestServerStream_CallFailsOnOutOfRetries() {
261 restarted := s.RestartServer(3 * retryTimeout)
262 _, err := s.Client.PingList(s.SimpleCtx(), goodPing)
263
264 require.Error(s.T(), err, "establishing the connection should not succeed")
265 assert.Equal(s.T(), codes.Unavailable, status.Code(err))
266
267 <-restarted
268 }
269
270 func (s *RetrySuite) TestServerStream_CallFailsOnDeadlineExceeded() {
271 restarted := s.RestartServer(3 * retryTimeout)
272 ctx, _ := context.WithTimeout(context.TODO(), retryTimeout)
273 _, err := s.Client.PingList(ctx, goodPing)
274
275 require.Error(s.T(), err, "establishing the connection should not succeed")
276 assert.Equal(s.T(), codes.DeadlineExceeded, status.Code(err))
277
278 <-restarted
279 }
280
281 func (s *RetrySuite) TestServerStream_CallRetrySucceeds() {
282 restarted := s.RestartServer(retryTimeout)
283
284 _, err := s.Client.PingList(s.SimpleCtx(), goodPing,
285 grpc_retry.WithMax(40),
286 )
287
288 assert.NoError(s.T(), err, "establishing the connection should succeed")
289 <-restarted
290 }
291
292 func (s *RetrySuite) assertPingListWasCorrect(stream pb_testproto.TestService_PingListClient) {
293 count := 0
294 for {
295 pong, err := stream.Recv()
296 if err == io.EOF {
297 break
298 }
299 require.NotNil(s.T(), pong, "received values must not be nil")
300 require.NoError(s.T(), err, "no errors during receive on client side")
301 require.Equal(s.T(), goodPing.Value, pong.Value, "the returned pong contained the outgoing ping")
302 count += 1
303 }
304 require.EqualValues(s.T(), grpc_testing.ListResponseCount, count, "should have received all ping items")
305 }
306
307 type trackedInterceptor struct {
308 called int
309 }
310
311 func (ti *trackedInterceptor) UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
312 ti.called++
313 return invoker(ctx, method, req, reply, cc, opts...)
314 }
315
316 func (ti *trackedInterceptor) StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
317 ti.called++
318 return streamer(ctx, desc, cc, method, opts...)
319 }
320
321 func TestChainedRetrySuite(t *testing.T) {
322 service := &failingService{
323 TestServiceServer: &grpc_testing.TestPingService{T: t},
324 }
325 preRetryInterceptor := &trackedInterceptor{}
326 postRetryInterceptor := &trackedInterceptor{}
327 s := &ChainedRetrySuite{
328 srv: service,
329 preRetryInterceptor: preRetryInterceptor,
330 postRetryInterceptor: postRetryInterceptor,
331 InterceptorTestSuite: &grpc_testing.InterceptorTestSuite{
332 TestService: service,
333 ClientOpts: []grpc.DialOption{
334 grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, grpc_retry.UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)),
335 grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, grpc_retry.StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)),
336 },
337 },
338 }
339 suite.Run(t, s)
340 }
341
342 type ChainedRetrySuite struct {
343 *grpc_testing.InterceptorTestSuite
344 srv *failingService
345 preRetryInterceptor *trackedInterceptor
346 postRetryInterceptor *trackedInterceptor
347 }
348
349 func (s *ChainedRetrySuite) SetupTest() {
350 s.srv.resetFailingConfiguration( 0, codes.OK, noSleep)
351 s.preRetryInterceptor.called = 0
352 s.postRetryInterceptor.called = 0
353 }
354
355 func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() {
356 _, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
357 require.NoError(s.T(), err, "the invocation should succeed")
358 require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
359 require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
360 require.EqualValues(s.T(), 1, s.postRetryInterceptor.called, "post-retry interceptor should be called once")
361 }
362
363 func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_WithRetry() {
364 s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
365 _, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
366 require.NoError(s.T(), err, "the second invocation should succeed")
367 require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
368 require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
369 require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
370 }
371
372 func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() {
373 stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
374 require.NoError(s.T(), err, "the invocation should succeed")
375 _, err = stream.Recv()
376 require.NoError(s.T(), err, "the Recv should succeed")
377 require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
378 require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
379 require.EqualValues(s.T(), 1, s.postRetryInterceptor.called, "post-retry interceptor should be called once")
380 }
381
382 func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_WithRetry() {
383 s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
384 stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
385 require.NoError(s.T(), err, "the second invocation should succeed")
386 _, err = stream.Recv()
387 require.NoError(s.T(), err, "the Recv should succeed")
388 require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
389 require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
390 require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
391 }
392
View as plain text