...

Source file src/google.golang.org/grpc/test/retry_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2018 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"io"
    25  	"net"
    26  	"reflect"
    27  	"strconv"
    28  	"strings"
    29  	"sync"
    30  	"testing"
    31  	"time"
    32  
    33  	"google.golang.org/grpc"
    34  	"google.golang.org/grpc/codes"
    35  	"google.golang.org/grpc/credentials/insecure"
    36  	"google.golang.org/grpc/internal/grpcsync"
    37  	"google.golang.org/grpc/internal/stubserver"
    38  	"google.golang.org/grpc/metadata"
    39  	"google.golang.org/grpc/stats"
    40  	"google.golang.org/grpc/status"
    41  	"google.golang.org/protobuf/proto"
    42  
    43  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    44  	testpb "google.golang.org/grpc/interop/grpc_testing"
    45  )
    46  
    47  func (s) TestRetryUnary(t *testing.T) {
    48  	i := -1
    49  	ss := &stubserver.StubServer{
    50  		EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
    51  			defer func() { t.Logf("server call %v returning err %v", i, err) }()
    52  			i++
    53  			switch i {
    54  			case 0, 2, 5:
    55  				return &testpb.Empty{}, nil
    56  			case 6, 8, 11:
    57  				return nil, status.New(codes.Internal, "non-retryable error").Err()
    58  			}
    59  			return nil, status.New(codes.AlreadyExists, "retryable error").Err()
    60  		},
    61  	}
    62  	if err := ss.Start([]grpc.ServerOption{},
    63  		grpc.WithDefaultServiceConfig(`{
    64      "methodConfig": [{
    65        "name": [{"service": "grpc.testing.TestService"}],
    66        "waitForReady": true,
    67        "retryPolicy": {
    68          "MaxAttempts": 4,
    69          "InitialBackoff": ".01s",
    70          "MaxBackoff": ".01s",
    71          "BackoffMultiplier": 1.0,
    72          "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
    73        }
    74      }]}`)); err != nil {
    75  		t.Fatalf("Error starting endpoint server: %v", err)
    76  	}
    77  	defer ss.Stop()
    78  
    79  	testCases := []struct {
    80  		code  codes.Code
    81  		count int
    82  	}{
    83  		{codes.OK, 0},
    84  		{codes.OK, 2},
    85  		{codes.OK, 5},
    86  		{codes.Internal, 6},
    87  		{codes.Internal, 8},
    88  		{codes.Internal, 11},
    89  		{codes.AlreadyExists, 15},
    90  	}
    91  	for num, tc := range testCases {
    92  		t.Log("Case", num)
    93  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    94  		_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
    95  		cancel()
    96  		if status.Code(err) != tc.code {
    97  			t.Fatalf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
    98  		}
    99  		if i != tc.count {
   100  			t.Fatalf("i = %v; want %v", i, tc.count)
   101  		}
   102  	}
   103  }
   104  
   105  func (s) TestRetryThrottling(t *testing.T) {
   106  	i := -1
   107  	ss := &stubserver.StubServer{
   108  		EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
   109  			i++
   110  			switch i {
   111  			case 0, 3, 6, 10, 11, 12, 13, 14, 16, 18:
   112  				return &testpb.Empty{}, nil
   113  			}
   114  			return nil, status.New(codes.Unavailable, "retryable error").Err()
   115  		},
   116  	}
   117  	if err := ss.Start([]grpc.ServerOption{},
   118  		grpc.WithDefaultServiceConfig(`{
   119      "methodConfig": [{
   120        "name": [{"service": "grpc.testing.TestService"}],
   121        "waitForReady": true,
   122        "retryPolicy": {
   123          "MaxAttempts": 4,
   124          "InitialBackoff": ".01s",
   125          "MaxBackoff": ".01s",
   126          "BackoffMultiplier": 1.0,
   127          "RetryableStatusCodes": [ "UNAVAILABLE" ]
   128        }
   129      }],
   130      "retryThrottling": {
   131        "maxTokens": 10,
   132        "tokenRatio": 0.5
   133      }
   134      }`)); err != nil {
   135  		t.Fatalf("Error starting endpoint server: %v", err)
   136  	}
   137  	defer ss.Stop()
   138  
   139  	testCases := []struct {
   140  		code  codes.Code
   141  		count int
   142  	}{
   143  		{codes.OK, 0},           // tokens = 10
   144  		{codes.OK, 3},           // tokens = 8.5 (10 - 2 failures + 0.5 success)
   145  		{codes.OK, 6},           // tokens = 6
   146  		{codes.Unavailable, 8},  // tokens = 5 -- first attempt is retried; second aborted.
   147  		{codes.Unavailable, 9},  // tokens = 4
   148  		{codes.OK, 10},          // tokens = 4.5
   149  		{codes.OK, 11},          // tokens = 5
   150  		{codes.OK, 12},          // tokens = 5.5
   151  		{codes.OK, 13},          // tokens = 6
   152  		{codes.OK, 14},          // tokens = 6.5
   153  		{codes.OK, 16},          // tokens = 5.5
   154  		{codes.Unavailable, 17}, // tokens = 4.5
   155  	}
   156  	for _, tc := range testCases {
   157  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   158  		_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
   159  		cancel()
   160  		if status.Code(err) != tc.code {
   161  			t.Errorf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
   162  		}
   163  		if i != tc.count {
   164  			t.Errorf("i = %v; want %v", i, tc.count)
   165  		}
   166  	}
   167  }
   168  
   169  func (s) TestRetryStreaming(t *testing.T) {
   170  	req := func(b byte) *testpb.StreamingOutputCallRequest {
   171  		return &testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: []byte{b}}}
   172  	}
   173  	res := func(b byte) *testpb.StreamingOutputCallResponse {
   174  		return &testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{Body: []byte{b}}}
   175  	}
   176  
   177  	largePayload, _ := newPayload(testpb.PayloadType_COMPRESSABLE, 500)
   178  
   179  	type serverOp func(stream testgrpc.TestService_FullDuplexCallServer) error
   180  	type clientOp func(stream testgrpc.TestService_FullDuplexCallClient) error
   181  
   182  	// Server Operations
   183  	sAttempts := func(n int) serverOp {
   184  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   185  			const key = "grpc-previous-rpc-attempts"
   186  			md, ok := metadata.FromIncomingContext(stream.Context())
   187  			if !ok {
   188  				return status.Errorf(codes.Internal, "server: no header metadata received")
   189  			}
   190  			if got := md[key]; len(got) != 1 || got[0] != strconv.Itoa(n) {
   191  				return status.Errorf(codes.Internal, "server: metadata = %v; want <contains %q: %q>", md, key, n)
   192  			}
   193  			return nil
   194  		}
   195  	}
   196  	sReq := func(b byte) serverOp {
   197  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   198  			want := req(b)
   199  			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
   200  				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
   201  			}
   202  			return nil
   203  		}
   204  	}
   205  	sReqPayload := func(p *testpb.Payload) serverOp {
   206  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   207  			want := &testpb.StreamingOutputCallRequest{Payload: p}
   208  			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
   209  				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
   210  			}
   211  			return nil
   212  		}
   213  	}
   214  	sHdr := func() serverOp {
   215  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   216  			return stream.SendHeader(metadata.Pairs("test_header", "test_value"))
   217  		}
   218  	}
   219  	sRes := func(b byte) serverOp {
   220  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   221  			msg := res(b)
   222  			if err := stream.Send(msg); err != nil {
   223  				return status.Errorf(codes.Internal, "server: Send(%v) = %v; want <nil>", msg, err)
   224  			}
   225  			return nil
   226  		}
   227  	}
   228  	sErr := func(c codes.Code) serverOp {
   229  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   230  			return status.New(c, "this is a test error").Err()
   231  		}
   232  	}
   233  	sCloseSend := func() serverOp {
   234  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   235  			if msg, err := stream.Recv(); msg != nil || err != io.EOF {
   236  				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want <nil>, io.EOF", msg, err)
   237  			}
   238  			return nil
   239  		}
   240  	}
   241  	sPushback := func(s string) serverOp {
   242  		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
   243  			stream.SetTrailer(metadata.MD{"grpc-retry-pushback-ms": []string{s}})
   244  			return nil
   245  		}
   246  	}
   247  
   248  	// Client Operations
   249  	cReq := func(b byte) clientOp {
   250  		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
   251  			msg := req(b)
   252  			if err := stream.Send(msg); err != nil {
   253  				return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
   254  			}
   255  			return nil
   256  		}
   257  	}
   258  	cReqPayload := func(p *testpb.Payload) clientOp {
   259  		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
   260  			msg := &testpb.StreamingOutputCallRequest{Payload: p}
   261  			if err := stream.Send(msg); err != nil {
   262  				return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
   263  			}
   264  			return nil
   265  		}
   266  	}
   267  	cRes := func(b byte) clientOp {
   268  		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
   269  			want := res(b)
   270  			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
   271  				return fmt.Errorf("client: Recv() = %v, %v; want %v, <nil>", got, err, want)
   272  			}
   273  			return nil
   274  		}
   275  	}
   276  	cErr := func(c codes.Code) clientOp {
   277  		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
   278  			want := status.New(c, "this is a test error").Err()
   279  			if c == codes.OK {
   280  				want = io.EOF
   281  			}
   282  			res, err := stream.Recv()
   283  			if res != nil ||
   284  				((err == nil) != (want == nil)) ||
   285  				(want != nil && err.Error() != want.Error()) {
   286  				return fmt.Errorf("client: Recv() = %v, %v; want <nil>, %v", res, err, want)
   287  			}
   288  			return nil
   289  		}
   290  	}
   291  	cCloseSend := func() clientOp {
   292  		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
   293  			if err := stream.CloseSend(); err != nil {
   294  				return fmt.Errorf("client: CloseSend() = %v; want <nil>", err)
   295  			}
   296  			return nil
   297  		}
   298  	}
   299  	var curTime time.Time
   300  	cGetTime := func() clientOp {
   301  		return func(_ testgrpc.TestService_FullDuplexCallClient) error {
   302  			curTime = time.Now()
   303  			return nil
   304  		}
   305  	}
   306  	cCheckElapsed := func(d time.Duration) clientOp {
   307  		return func(_ testgrpc.TestService_FullDuplexCallClient) error {
   308  			if elapsed := time.Since(curTime); elapsed < d {
   309  				return fmt.Errorf("elapsed time: %v; want >= %v", elapsed, d)
   310  			}
   311  			return nil
   312  		}
   313  	}
   314  	cHdr := func() clientOp {
   315  		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
   316  			_, err := stream.Header()
   317  			if err == io.EOF {
   318  				// The stream ended successfully; convert to nil to avoid
   319  				// erroring the test case.
   320  				err = nil
   321  			}
   322  			return err
   323  		}
   324  	}
   325  	cCtx := func() clientOp {
   326  		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
   327  			stream.Context()
   328  			return nil
   329  		}
   330  	}
   331  
   332  	testCases := []struct {
   333  		desc      string
   334  		serverOps []serverOp
   335  		clientOps []clientOp
   336  	}{{
   337  		desc:      "Non-retryable error code",
   338  		serverOps: []serverOp{sReq(1), sErr(codes.Internal)},
   339  		clientOps: []clientOp{cReq(1), cErr(codes.Internal)},
   340  	}, {
   341  		desc:      "One retry necessary",
   342  		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1), sRes(1)},
   343  		clientOps: []clientOp{cReq(1), cRes(1), cErr(codes.OK)},
   344  	}, {
   345  		desc: "Exceed max attempts (4); check attempts header on server",
   346  		serverOps: []serverOp{
   347  			sReq(1), sErr(codes.Unavailable),
   348  			sReq(1), sAttempts(1), sErr(codes.Unavailable),
   349  			sAttempts(2), sReq(1), sErr(codes.Unavailable),
   350  			sAttempts(3), sReq(1), sErr(codes.Unavailable),
   351  		},
   352  		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
   353  	}, {
   354  		desc: "Multiple requests",
   355  		serverOps: []serverOp{
   356  			sReq(1), sReq(2), sErr(codes.Unavailable),
   357  			sReq(1), sReq(2), sRes(5),
   358  		},
   359  		clientOps: []clientOp{cReq(1), cReq(2), cRes(5), cErr(codes.OK)},
   360  	}, {
   361  		desc: "Multiple successive requests",
   362  		serverOps: []serverOp{
   363  			sReq(1), sErr(codes.Unavailable),
   364  			sReq(1), sReq(2), sErr(codes.Unavailable),
   365  			sReq(1), sReq(2), sReq(3), sRes(5),
   366  		},
   367  		clientOps: []clientOp{cReq(1), cReq(2), cReq(3), cRes(5), cErr(codes.OK)},
   368  	}, {
   369  		desc: "No retry after receiving",
   370  		serverOps: []serverOp{
   371  			sReq(1), sErr(codes.Unavailable),
   372  			sReq(1), sRes(3), sErr(codes.Unavailable),
   373  		},
   374  		clientOps: []clientOp{cReq(1), cRes(3), cErr(codes.Unavailable)},
   375  	}, {
   376  		desc:      "Retry via ClientStream.Header()",
   377  		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1)},
   378  		clientOps: []clientOp{cReq(1), cHdr() /* this should cause a retry */, cErr(codes.OK)},
   379  	}, {
   380  		desc:      "No retry after header",
   381  		serverOps: []serverOp{sReq(1), sHdr(), sErr(codes.Unavailable)},
   382  		clientOps: []clientOp{cReq(1), cHdr(), cErr(codes.Unavailable)},
   383  	}, {
   384  		desc:      "No retry after context",
   385  		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
   386  		clientOps: []clientOp{cReq(1), cCtx(), cErr(codes.Unavailable)},
   387  	}, {
   388  		desc: "Replaying close send",
   389  		serverOps: []serverOp{
   390  			sReq(1), sReq(2), sCloseSend(), sErr(codes.Unavailable),
   391  			sReq(1), sReq(2), sCloseSend(), sRes(1), sRes(3), sRes(5),
   392  		},
   393  		clientOps: []clientOp{cReq(1), cReq(2), cCloseSend(), cRes(1), cRes(3), cRes(5), cErr(codes.OK)},
   394  	}, {
   395  		desc:      "Negative server pushback - no retry",
   396  		serverOps: []serverOp{sReq(1), sPushback("-1"), sErr(codes.Unavailable)},
   397  		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
   398  	}, {
   399  		desc:      "Non-numeric server pushback - no retry",
   400  		serverOps: []serverOp{sReq(1), sPushback("xxx"), sErr(codes.Unavailable)},
   401  		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
   402  	}, {
   403  		desc:      "Multiple server pushback values - no retry",
   404  		serverOps: []serverOp{sReq(1), sPushback("100"), sPushback("10"), sErr(codes.Unavailable)},
   405  		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
   406  	}, {
   407  		desc:      "1s server pushback - delayed retry",
   408  		serverOps: []serverOp{sReq(1), sPushback("1000"), sErr(codes.Unavailable), sReq(1), sRes(2)},
   409  		clientOps: []clientOp{cGetTime(), cReq(1), cRes(2), cCheckElapsed(time.Second), cErr(codes.OK)},
   410  	}, {
   411  		desc:      "Overflowing buffer - no retry",
   412  		serverOps: []serverOp{sReqPayload(largePayload), sErr(codes.Unavailable)},
   413  		clientOps: []clientOp{cReqPayload(largePayload), cErr(codes.Unavailable)},
   414  	}}
   415  
   416  	var serverOpIter int
   417  	var serverOps []serverOp
   418  	ss := &stubserver.StubServer{
   419  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
   420  			for serverOpIter < len(serverOps) {
   421  				op := serverOps[serverOpIter]
   422  				serverOpIter++
   423  				if err := op(stream); err != nil {
   424  					return err
   425  				}
   426  			}
   427  			return nil
   428  		},
   429  	}
   430  	if err := ss.Start([]grpc.ServerOption{}, grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(200)),
   431  		grpc.WithDefaultServiceConfig(`{
   432      "methodConfig": [{
   433        "name": [{"service": "grpc.testing.TestService"}],
   434        "waitForReady": true,
   435        "retryPolicy": {
   436            "MaxAttempts": 4,
   437            "InitialBackoff": ".01s",
   438            "MaxBackoff": ".01s",
   439            "BackoffMultiplier": 1.0,
   440            "RetryableStatusCodes": [ "UNAVAILABLE" ]
   441        }
   442      }]}`)); err != nil {
   443  		t.Fatalf("Error starting endpoint server: %v", err)
   444  	}
   445  	defer ss.Stop()
   446  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   447  	defer cancel()
   448  	for {
   449  		if ctx.Err() != nil {
   450  			t.Fatalf("Timed out waiting for service config update")
   451  		}
   452  		if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
   453  			break
   454  		}
   455  		time.Sleep(time.Millisecond)
   456  	}
   457  
   458  	for _, tc := range testCases {
   459  		func() {
   460  			serverOpIter = 0
   461  			serverOps = tc.serverOps
   462  
   463  			stream, err := ss.Client.FullDuplexCall(ctx)
   464  			if err != nil {
   465  				t.Fatalf("%v: Error while creating stream: %v", tc.desc, err)
   466  			}
   467  			for _, op := range tc.clientOps {
   468  				if err := op(stream); err != nil {
   469  					t.Errorf("%v: %v", tc.desc, err)
   470  					break
   471  				}
   472  			}
   473  			if serverOpIter != len(serverOps) {
   474  				t.Errorf("%v: serverOpIter = %v; want %v", tc.desc, serverOpIter, len(serverOps))
   475  			}
   476  		}()
   477  	}
   478  }
   479  
   480  type retryStatsHandler struct {
   481  	mu sync.Mutex
   482  	s  []stats.RPCStats
   483  }
   484  
   485  func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
   486  	return ctx
   487  }
   488  func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
   489  	// these calls come in nondeterministically - so can just ignore
   490  	if _, ok := s.(*stats.PickerUpdated); ok {
   491  		return
   492  	}
   493  	h.mu.Lock()
   494  	h.s = append(h.s, s)
   495  	h.mu.Unlock()
   496  }
   497  func (*retryStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
   498  	return ctx
   499  }
   500  func (*retryStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
   501  
   502  func (s) TestRetryStats(t *testing.T) {
   503  	lis, err := net.Listen("tcp", "localhost:0")
   504  	if err != nil {
   505  		t.Fatalf("Failed to listen. Err: %v", err)
   506  	}
   507  	defer lis.Close()
   508  	server := &httpServer{
   509  		waitForEndStream: true,
   510  		responses: []httpServerResponse{{
   511  			trailers: [][]string{{
   512  				":status", "200",
   513  				"content-type", "application/grpc",
   514  				"grpc-status", "14", // UNAVAILABLE
   515  				"grpc-message", "unavailable retry",
   516  				"grpc-retry-pushback-ms", "10",
   517  			}},
   518  		}, {
   519  			headers: [][]string{{
   520  				":status", "200",
   521  				"content-type", "application/grpc",
   522  			}},
   523  			payload: []byte{0, 0, 0, 0, 0}, // header for 0-byte response message.
   524  			trailers: [][]string{{
   525  				"grpc-status", "0", // OK
   526  			}},
   527  		}},
   528  		refuseStream: func(i uint32) bool {
   529  			return i == 1
   530  		},
   531  	}
   532  	server.start(t, lis)
   533  	handler := &retryStatsHandler{}
   534  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(handler),
   535  		grpc.WithDefaultServiceConfig((`{
   536      "methodConfig": [{
   537        "name": [{"service": "grpc.testing.TestService"}],
   538        "retryPolicy": {
   539            "MaxAttempts": 4,
   540            "InitialBackoff": ".01s",
   541            "MaxBackoff": ".01s",
   542            "BackoffMultiplier": 1.0,
   543            "RetryableStatusCodes": [ "UNAVAILABLE" ]
   544        }
   545      }]}`)))
   546  	if err != nil {
   547  		t.Fatalf("failed to dial due to err: %v", err)
   548  	}
   549  	defer cc.Close()
   550  
   551  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   552  	defer cancel()
   553  
   554  	client := testgrpc.NewTestServiceClient(cc)
   555  
   556  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   557  		t.Fatalf("unexpected EmptyCall error: %v", err)
   558  	}
   559  	handler.mu.Lock()
   560  	want := []stats.RPCStats{
   561  		&stats.Begin{},
   562  		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
   563  		&stats.OutPayload{WireLength: 5},
   564  		&stats.End{},
   565  
   566  		&stats.Begin{IsTransparentRetryAttempt: true},
   567  		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
   568  		&stats.OutPayload{WireLength: 5},
   569  		&stats.InTrailer{Trailer: metadata.Pairs("content-type", "application/grpc", "grpc-retry-pushback-ms", "10")},
   570  		&stats.End{},
   571  
   572  		&stats.Begin{},
   573  		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
   574  		&stats.OutPayload{WireLength: 5},
   575  		&stats.InHeader{},
   576  		&stats.InPayload{WireLength: 5},
   577  		&stats.InTrailer{},
   578  		&stats.End{},
   579  	}
   580  
   581  	toString := func(ss []stats.RPCStats) (ret []string) {
   582  		for _, s := range ss {
   583  			ret = append(ret, fmt.Sprintf("%T - %v", s, s))
   584  		}
   585  		return ret
   586  	}
   587  	t.Logf("Handler received frames:\n%v\n---\nwant:\n%v\n",
   588  		strings.Join(toString(handler.s), "\n"),
   589  		strings.Join(toString(want), "\n"))
   590  
   591  	if len(handler.s) != len(want) {
   592  		t.Fatalf("received unexpected number of RPCStats: got %v; want %v", len(handler.s), len(want))
   593  	}
   594  
   595  	// There is a race between receiving the payload (triggered by the
   596  	// application / gRPC library) and receiving the trailer (triggered at the
   597  	// transport layer).  Adjust the received stats accordingly if necessary.
   598  	const tIdx, pIdx = 13, 14
   599  	_, okT := handler.s[tIdx].(*stats.InTrailer)
   600  	_, okP := handler.s[pIdx].(*stats.InPayload)
   601  	if okT && okP {
   602  		handler.s[pIdx], handler.s[tIdx] = handler.s[tIdx], handler.s[pIdx]
   603  	}
   604  
   605  	for i := range handler.s {
   606  		w, s := want[i], handler.s[i]
   607  
   608  		// Validate the event type
   609  		if reflect.TypeOf(w) != reflect.TypeOf(s) {
   610  			t.Fatalf("at position %v: got %T; want %T", i, s, w)
   611  		}
   612  		wv, sv := reflect.ValueOf(w).Elem(), reflect.ValueOf(s).Elem()
   613  
   614  		// Validate that Client is always true
   615  		if sv.FieldByName("Client").Interface().(bool) != true {
   616  			t.Fatalf("at position %v: got Client=false; want true", i)
   617  		}
   618  
   619  		// Validate any set fields in want
   620  		for i := 0; i < wv.NumField(); i++ {
   621  			if !wv.Field(i).IsZero() {
   622  				if got, want := sv.Field(i).Interface(), wv.Field(i).Interface(); !reflect.DeepEqual(got, want) {
   623  					name := reflect.TypeOf(w).Elem().Field(i).Name
   624  					t.Fatalf("at position %v, field %v: got %v; want %v", i, name, got, want)
   625  				}
   626  			}
   627  		}
   628  
   629  		// Since the above only tests non-zero-value fields, test
   630  		// IsTransparentRetryAttempt=false explicitly when needed.
   631  		if wb, ok := w.(*stats.Begin); ok && !wb.IsTransparentRetryAttempt {
   632  			if s.(*stats.Begin).IsTransparentRetryAttempt {
   633  				t.Fatalf("at position %v: got IsTransparentRetryAttempt=true; want false", i)
   634  			}
   635  		}
   636  	}
   637  
   638  	// Validate timings between last Begin and preceding End.
   639  	end := handler.s[8].(*stats.End)
   640  	begin := handler.s[9].(*stats.Begin)
   641  	diff := begin.BeginTime.Sub(end.EndTime)
   642  	if diff < 10*time.Millisecond || diff > 50*time.Millisecond {
   643  		t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff)
   644  	}
   645  }
   646  
   647  func (s) TestRetryTransparentWhenCommitted(t *testing.T) {
   648  	// With MaxConcurrentStreams=1:
   649  	//
   650  	// 1. Create stream 1 that is retriable.
   651  	// 2. Stream 1 is created and fails with a retriable code.
   652  	// 3. Create dummy stream 2, blocking indefinitely.
   653  	// 4. Stream 1 retries (and blocks until stream 2 finishes)
   654  	// 5. Stream 1 is canceled manually.
   655  	//
   656  	// If there is no bug, the stream is done and errors with CANCELED.  With a bug:
   657  	//
   658  	// 6. Stream 1 has a nil stream (attempt.s).  Operations like CloseSend will panic.
   659  
   660  	first := grpcsync.NewEvent()
   661  	ss := &stubserver.StubServer{
   662  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
   663  			// signal?
   664  			if !first.HasFired() {
   665  				first.Fire()
   666  				t.Log("returned first error")
   667  				return status.Error(codes.AlreadyExists, "first attempt fails and is retriable")
   668  			}
   669  			t.Log("blocking")
   670  			<-stream.Context().Done()
   671  			return stream.Context().Err()
   672  		},
   673  	}
   674  
   675  	if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)},
   676  		grpc.WithDefaultServiceConfig(`{
   677      "methodConfig": [{
   678        "name": [{"service": "grpc.testing.TestService"}],
   679        "waitForReady": true,
   680        "retryPolicy": {
   681          "MaxAttempts": 2,
   682          "InitialBackoff": ".1s",
   683          "MaxBackoff": ".1s",
   684          "BackoffMultiplier": 1.0,
   685          "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
   686        }
   687      }]}`)); err != nil {
   688  		t.Fatalf("Error starting endpoint server: %v", err)
   689  	}
   690  	defer ss.Stop()
   691  
   692  	ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTestTimeout)
   693  	defer cancel1()
   694  	ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTestTimeout)
   695  	defer cancel2()
   696  
   697  	stream1, err := ss.Client.FullDuplexCall(ctx1)
   698  	if err != nil {
   699  		t.Fatalf("Error creating stream 1: %v", err)
   700  	}
   701  
   702  	// Create dummy stream to block indefinitely.
   703  	_, err = ss.Client.FullDuplexCall(ctx2)
   704  	if err != nil {
   705  		t.Errorf("Error creating stream 2: %v", err)
   706  	}
   707  
   708  	stream1Closed := grpcsync.NewEvent()
   709  	go func() {
   710  		_, err := stream1.Recv()
   711  		// Will trigger a retry when it sees the ALREADY_EXISTS error
   712  		if status.Code(err) != codes.Canceled {
   713  			t.Errorf("Expected stream1 to be canceled; got error: %v", err)
   714  		}
   715  		stream1Closed.Fire()
   716  	}()
   717  
   718  	// Wait longer than the retry backoff timer.
   719  	time.Sleep(200 * time.Millisecond)
   720  	cancel1()
   721  
   722  	// Operations on the stream should not panic.
   723  	<-stream1Closed.Done()
   724  	stream1.CloseSend()
   725  	stream1.Recv()
   726  	stream1.Send(&testpb.StreamingOutputCallRequest{})
   727  }
   728  

View as plain text