...

Source file src/github.com/grpc-ecosystem/go-grpc-middleware/retry/retry_test.go

Documentation: github.com/grpc-ecosystem/go-grpc-middleware/retry

     1  // Copyright 2016 Michal Witkowski. All Rights Reserved.
     2  // See LICENSE for licensing terms.
     3  
     4  package grpc_retry_test
     5  
     6  import (
     7  	"context"
     8  	"io"
     9  	"sync"
    10  	"testing"
    11  	"time"
    12  
    13  	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
    14  	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
    15  	"github.com/grpc-ecosystem/go-grpc-middleware/testing"
    16  	pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
    17  	"github.com/stretchr/testify/assert"
    18  	"github.com/stretchr/testify/require"
    19  	"github.com/stretchr/testify/suite"
    20  	"google.golang.org/grpc"
    21  	"google.golang.org/grpc/codes"
    22  	"google.golang.org/grpc/status"
    23  )
    24  
    25  var (
    26  	retriableErrors = []codes.Code{codes.Unavailable, codes.DataLoss}
    27  	goodPing        = &pb_testproto.PingRequest{Value: "something"}
    28  	noSleep         = 0 * time.Second
    29  	retryTimeout    = 50 * time.Millisecond
    30  )
    31  
    32  type failingService struct {
    33  	pb_testproto.TestServiceServer
    34  	mu sync.Mutex
    35  
    36  	reqCounter uint
    37  	reqModulo  uint
    38  	reqSleep   time.Duration
    39  	reqError   codes.Code
    40  }
    41  
    42  func (s *failingService) resetFailingConfiguration(modulo uint, errorCode codes.Code, sleepTime time.Duration) {
    43  	s.mu.Lock()
    44  	defer s.mu.Unlock()
    45  
    46  	s.reqCounter = 0
    47  	s.reqModulo = modulo
    48  	s.reqError = errorCode
    49  	s.reqSleep = sleepTime
    50  }
    51  
    52  func (s *failingService) requestCount() uint {
    53  	s.mu.Lock()
    54  	defer s.mu.Unlock()
    55  	return s.reqCounter
    56  }
    57  
    58  func (s *failingService) maybeFailRequest() error {
    59  	s.mu.Lock()
    60  	s.reqCounter += 1
    61  	reqModulo := s.reqModulo
    62  	reqCounter := s.reqCounter
    63  	reqSleep := s.reqSleep
    64  	reqError := s.reqError
    65  	s.mu.Unlock()
    66  	if (reqModulo > 0) && (reqCounter%reqModulo == 0) {
    67  		return nil
    68  	}
    69  	time.Sleep(reqSleep)
    70  	return status.Errorf(reqError, "maybeFailRequest: failing it")
    71  }
    72  
    73  func (s *failingService) Ping(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) {
    74  	if err := s.maybeFailRequest(); err != nil {
    75  		return nil, err
    76  	}
    77  	return s.TestServiceServer.Ping(ctx, ping)
    78  }
    79  
    80  func (s *failingService) PingList(ping *pb_testproto.PingRequest, stream pb_testproto.TestService_PingListServer) error {
    81  	if err := s.maybeFailRequest(); err != nil {
    82  		return err
    83  	}
    84  	return s.TestServiceServer.PingList(ping, stream)
    85  }
    86  
    87  func (s *failingService) PingStream(stream pb_testproto.TestService_PingStreamServer) error {
    88  	if err := s.maybeFailRequest(); err != nil {
    89  		return err
    90  	}
    91  	return s.TestServiceServer.PingStream(stream)
    92  }
    93  
    94  func TestRetrySuite(t *testing.T) {
    95  	service := &failingService{
    96  		TestServiceServer: &grpc_testing.TestPingService{T: t},
    97  	}
    98  	unaryInterceptor := grpc_retry.UnaryClientInterceptor(
    99  		grpc_retry.WithCodes(retriableErrors...),
   100  		grpc_retry.WithMax(3),
   101  		grpc_retry.WithBackoff(grpc_retry.BackoffLinear(retryTimeout)),
   102  	)
   103  	streamInterceptor := grpc_retry.StreamClientInterceptor(
   104  		grpc_retry.WithCodes(retriableErrors...),
   105  		grpc_retry.WithMax(3),
   106  		grpc_retry.WithBackoff(grpc_retry.BackoffLinear(retryTimeout)),
   107  	)
   108  	s := &RetrySuite{
   109  		srv: service,
   110  		InterceptorTestSuite: &grpc_testing.InterceptorTestSuite{
   111  			TestService: service,
   112  			ClientOpts: []grpc.DialOption{
   113  				grpc.WithStreamInterceptor(streamInterceptor),
   114  				grpc.WithUnaryInterceptor(unaryInterceptor),
   115  			},
   116  		},
   117  	}
   118  	suite.Run(t, s)
   119  }
   120  
   121  type RetrySuite struct {
   122  	*grpc_testing.InterceptorTestSuite
   123  	srv *failingService
   124  }
   125  
   126  func (s *RetrySuite) SetupTest() {
   127  	s.srv.resetFailingConfiguration( /* don't fail */ 0, codes.OK, noSleep)
   128  }
   129  
   130  func (s *RetrySuite) TestUnary_FailsOnNonRetriableError() {
   131  	s.srv.resetFailingConfiguration(5, codes.Internal, noSleep)
   132  	_, err := s.Client.Ping(s.SimpleCtx(), goodPing)
   133  	require.Error(s.T(), err, "error must occur from the failing service")
   134  	require.Equal(s.T(), codes.Internal, status.Code(err), "failure code must come from retrier")
   135  	require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
   136  }
   137  
   138  func (s *RetrySuite) TestUnary_FailsOnNonRetriableContextError() {
   139  	s.srv.resetFailingConfiguration(5, codes.Canceled, noSleep)
   140  	_, err := s.Client.Ping(s.SimpleCtx(), goodPing)
   141  	require.Error(s.T(), err, "error must occur from the failing service")
   142  	require.Equal(s.T(), codes.Canceled, status.Code(err), "failure code must come from retrier")
   143  	require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
   144  }
   145  
   146  func (s *RetrySuite) TestCallOptionsDontPanicWithoutInterceptor() {
   147  	// Fix for https://github.com/grpc-ecosystem/go-grpc-middleware/issues/37
   148  	// If this code doesn't panic, that's good.
   149  	s.srv.resetFailingConfiguration(100, codes.DataLoss, noSleep) // doesn't matter all requests should fail
   150  	nonMiddlewareClient := s.NewClient()
   151  	_, err := nonMiddlewareClient.Ping(s.SimpleCtx(), goodPing,
   152  		grpc_retry.WithMax(5),
   153  		grpc_retry.WithBackoff(grpc_retry.BackoffLinear(1*time.Millisecond)),
   154  		grpc_retry.WithCodes(codes.DataLoss),
   155  		grpc_retry.WithPerRetryTimeout(1*time.Millisecond),
   156  	)
   157  	require.Error(s.T(), err)
   158  }
   159  
   160  func (s *RetrySuite) TestServerStream_FailsOnNonRetriableError() {
   161  	s.srv.resetFailingConfiguration(5, codes.Internal, noSleep)
   162  	stream, err := s.Client.PingList(s.SimpleCtx(), goodPing)
   163  	require.NoError(s.T(), err, "should not fail on establishing the stream")
   164  	_, err = stream.Recv()
   165  	require.Error(s.T(), err, "error must occur from the failing service")
   166  	require.Equal(s.T(), codes.Internal, status.Code(err), "failure code must come from retrier")
   167  }
   168  
   169  func (s *RetrySuite) TestUnary_SucceedsOnRetriableError() {
   170  	s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep) // see retriable_errors
   171  	out, err := s.Client.Ping(s.SimpleCtx(), goodPing)
   172  	require.NoError(s.T(), err, "the third invocation should succeed")
   173  	require.NotNil(s.T(), out, "Pong must be not nil")
   174  	require.EqualValues(s.T(), 3, s.srv.requestCount(), "three requests should have been made")
   175  }
   176  
   177  func (s *RetrySuite) TestUnary_OverrideFromDialOpts() {
   178  	s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors
   179  	out, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithCodes(codes.ResourceExhausted), grpc_retry.WithMax(5))
   180  	require.NoError(s.T(), err, "the fifth invocation should succeed")
   181  	require.NotNil(s.T(), out, "Pong must be not nil")
   182  	require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
   183  }
   184  
   185  func (s *RetrySuite) TestUnary_PerCallDeadline_Succeeds() {
   186  	// This tests 5 requests, with first 4 sleeping for 10 millisecond, and the retry logic firing
   187  	// a retry call with a 5 millisecond deadline. The 5th one doesn't sleep and succeeds.
   188  	deadlinePerCall := 5 * time.Millisecond
   189  	s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
   190  	out, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
   191  		grpc_retry.WithMax(5))
   192  	require.NoError(s.T(), err, "the fifth invocation should succeed")
   193  	require.NotNil(s.T(), out, "Pong must be not nil")
   194  	require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
   195  }
   196  
   197  func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() {
   198  	// This tests that the parent context (passed to the invocation) takes precedence over retries.
   199  	// The parent context has 150 milliseconds of deadline.
   200  	// Each failed call sleeps for 100milliseconds, and there is 5 milliseconds between each one.
   201  	// This means that unlike in TestUnary_PerCallDeadline_Succeeds, the fifth successful call won't
   202  	// be made.
   203  	parentDeadline := 150 * time.Millisecond
   204  	deadlinePerCall := 50 * time.Millisecond
   205  	// All 0-4 requests should have 10 millisecond sleeps and deadline, while the last one works.
   206  	s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
   207  	ctx, _ := context.WithTimeout(context.TODO(), parentDeadline)
   208  	_, err := s.Client.Ping(ctx, goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
   209  		grpc_retry.WithMax(5))
   210  	require.Error(s.T(), err, "the retries must fail due to context deadline exceeded")
   211  	require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
   212  }
   213  
   214  func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() {
   215  	s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep) // see retriable_errors
   216  	stream, err := s.Client.PingList(s.SimpleCtx(), goodPing)
   217  	require.NoError(s.T(), err, "establishing the connection must always succeed")
   218  	s.assertPingListWasCorrect(stream)
   219  	require.EqualValues(s.T(), 3, s.srv.requestCount(), "three requests should have been made")
   220  }
   221  
   222  func (s *RetrySuite) TestServerStream_OverrideFromContext() {
   223  	s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors
   224  	stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithCodes(codes.ResourceExhausted), grpc_retry.WithMax(5))
   225  	require.NoError(s.T(), err, "establishing the connection must always succeed")
   226  	s.assertPingListWasCorrect(stream)
   227  	require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
   228  }
   229  
   230  func (s *RetrySuite) TestServerStream_PerCallDeadline_Succeeds() {
   231  	// This tests 5 requests, with first 4 sleeping for 100 millisecond, and the retry logic firing
   232  	// a retry call with a 50 millisecond deadline. The 5th one doesn't sleep and succeeds.
   233  	deadlinePerCall := 50 * time.Millisecond
   234  	s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
   235  	stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
   236  		grpc_retry.WithMax(5))
   237  	require.NoError(s.T(), err, "establishing the connection must always succeed")
   238  	s.assertPingListWasCorrect(stream)
   239  	require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
   240  }
   241  
   242  func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
   243  	// This tests that the parent context (passed to the invocation) takes precedence over retries.
   244  	// The parent context has 150 milliseconds of deadline.
   245  	// Each failed call sleeps for 50milliseconds, and there is 25 milliseconds between each one.
   246  	// This means that unlike in TestServerStream_PerCallDeadline_Succeeds, the fifth successful call won't
   247  	// be made.
   248  	parentDeadline := 150 * time.Millisecond
   249  	deadlinePerCall := 50 * time.Millisecond
   250  	// All 0-4 requests should have 10 millisecond sleeps and deadline, while the last one works.
   251  	s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
   252  	parentCtx, _ := context.WithTimeout(context.TODO(), parentDeadline)
   253  	stream, err := s.Client.PingList(parentCtx, goodPing, grpc_retry.WithPerRetryTimeout(deadlinePerCall),
   254  		grpc_retry.WithMax(5))
   255  	require.NoError(s.T(), err, "establishing the connection must always succeed")
   256  	_, err = stream.Recv()
   257  	require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
   258  }
   259  
   260  func (s *RetrySuite) TestServerStream_CallFailsOnOutOfRetries() {
   261  	restarted := s.RestartServer(3 * retryTimeout)
   262  	_, err := s.Client.PingList(s.SimpleCtx(), goodPing)
   263  
   264  	require.Error(s.T(), err, "establishing the connection should not succeed")
   265  	assert.Equal(s.T(), codes.Unavailable, status.Code(err))
   266  
   267  	<-restarted
   268  }
   269  
   270  func (s *RetrySuite) TestServerStream_CallFailsOnDeadlineExceeded() {
   271  	restarted := s.RestartServer(3 * retryTimeout)
   272  	ctx, _ := context.WithTimeout(context.TODO(), retryTimeout)
   273  	_, err := s.Client.PingList(ctx, goodPing)
   274  
   275  	require.Error(s.T(), err, "establishing the connection should not succeed")
   276  	assert.Equal(s.T(), codes.DeadlineExceeded, status.Code(err))
   277  
   278  	<-restarted
   279  }
   280  
   281  func (s *RetrySuite) TestServerStream_CallRetrySucceeds() {
   282  	restarted := s.RestartServer(retryTimeout)
   283  
   284  	_, err := s.Client.PingList(s.SimpleCtx(), goodPing,
   285  		grpc_retry.WithMax(40),
   286  	)
   287  
   288  	assert.NoError(s.T(), err, "establishing the connection should succeed")
   289  	<-restarted
   290  }
   291  
   292  func (s *RetrySuite) assertPingListWasCorrect(stream pb_testproto.TestService_PingListClient) {
   293  	count := 0
   294  	for {
   295  		pong, err := stream.Recv()
   296  		if err == io.EOF {
   297  			break
   298  		}
   299  		require.NotNil(s.T(), pong, "received values must not be nil")
   300  		require.NoError(s.T(), err, "no errors during receive on client side")
   301  		require.Equal(s.T(), goodPing.Value, pong.Value, "the returned pong contained the outgoing ping")
   302  		count += 1
   303  	}
   304  	require.EqualValues(s.T(), grpc_testing.ListResponseCount, count, "should have received all ping items")
   305  }
   306  
   307  type trackedInterceptor struct {
   308  	called int
   309  }
   310  
   311  func (ti *trackedInterceptor) UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   312  	ti.called++
   313  	return invoker(ctx, method, req, reply, cc, opts...)
   314  }
   315  
   316  func (ti *trackedInterceptor) StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
   317  	ti.called++
   318  	return streamer(ctx, desc, cc, method, opts...)
   319  }
   320  
   321  func TestChainedRetrySuite(t *testing.T) {
   322  	service := &failingService{
   323  		TestServiceServer: &grpc_testing.TestPingService{T: t},
   324  	}
   325  	preRetryInterceptor := &trackedInterceptor{}
   326  	postRetryInterceptor := &trackedInterceptor{}
   327  	s := &ChainedRetrySuite{
   328  		srv:                  service,
   329  		preRetryInterceptor:  preRetryInterceptor,
   330  		postRetryInterceptor: postRetryInterceptor,
   331  		InterceptorTestSuite: &grpc_testing.InterceptorTestSuite{
   332  			TestService: service,
   333  			ClientOpts: []grpc.DialOption{
   334  				grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, grpc_retry.UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)),
   335  				grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, grpc_retry.StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)),
   336  			},
   337  		},
   338  	}
   339  	suite.Run(t, s)
   340  }
   341  
   342  type ChainedRetrySuite struct {
   343  	*grpc_testing.InterceptorTestSuite
   344  	srv                  *failingService
   345  	preRetryInterceptor  *trackedInterceptor
   346  	postRetryInterceptor *trackedInterceptor
   347  }
   348  
   349  func (s *ChainedRetrySuite) SetupTest() {
   350  	s.srv.resetFailingConfiguration( /* don't fail */ 0, codes.OK, noSleep)
   351  	s.preRetryInterceptor.called = 0
   352  	s.postRetryInterceptor.called = 0
   353  }
   354  
   355  func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() {
   356  	_, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
   357  	require.NoError(s.T(), err, "the invocation should succeed")
   358  	require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
   359  	require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
   360  	require.EqualValues(s.T(), 1, s.postRetryInterceptor.called, "post-retry interceptor should be called once")
   361  }
   362  
   363  func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_WithRetry() {
   364  	s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
   365  	_, err := s.Client.Ping(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
   366  	require.NoError(s.T(), err, "the second invocation should succeed")
   367  	require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
   368  	require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
   369  	require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
   370  }
   371  
   372  func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() {
   373  	stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
   374  	require.NoError(s.T(), err, "the invocation should succeed")
   375  	_, err = stream.Recv()
   376  	require.NoError(s.T(), err, "the Recv should succeed")
   377  	require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
   378  	require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
   379  	require.EqualValues(s.T(), 1, s.postRetryInterceptor.called, "post-retry interceptor should be called once")
   380  }
   381  
   382  func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_WithRetry() {
   383  	s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
   384  	stream, err := s.Client.PingList(s.SimpleCtx(), goodPing, grpc_retry.WithMax(2))
   385  	require.NoError(s.T(), err, "the second invocation should succeed")
   386  	_, err = stream.Recv()
   387  	require.NoError(s.T(), err, "the Recv should succeed")
   388  	require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
   389  	require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
   390  	require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
   391  }
   392  

View as plain text