1
18
19 package test
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "net"
26 "reflect"
27 "strconv"
28 "strings"
29 "sync"
30 "testing"
31 "time"
32
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials/insecure"
36 "google.golang.org/grpc/internal/grpcsync"
37 "google.golang.org/grpc/internal/stubserver"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/grpc/stats"
40 "google.golang.org/grpc/status"
41 "google.golang.org/protobuf/proto"
42
43 testgrpc "google.golang.org/grpc/interop/grpc_testing"
44 testpb "google.golang.org/grpc/interop/grpc_testing"
45 )
46
47 func (s) TestRetryUnary(t *testing.T) {
48 i := -1
49 ss := &stubserver.StubServer{
50 EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
51 defer func() { t.Logf("server call %v returning err %v", i, err) }()
52 i++
53 switch i {
54 case 0, 2, 5:
55 return &testpb.Empty{}, nil
56 case 6, 8, 11:
57 return nil, status.New(codes.Internal, "non-retryable error").Err()
58 }
59 return nil, status.New(codes.AlreadyExists, "retryable error").Err()
60 },
61 }
62 if err := ss.Start([]grpc.ServerOption{},
63 grpc.WithDefaultServiceConfig(`{
64 "methodConfig": [{
65 "name": [{"service": "grpc.testing.TestService"}],
66 "waitForReady": true,
67 "retryPolicy": {
68 "MaxAttempts": 4,
69 "InitialBackoff": ".01s",
70 "MaxBackoff": ".01s",
71 "BackoffMultiplier": 1.0,
72 "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
73 }
74 }]}`)); err != nil {
75 t.Fatalf("Error starting endpoint server: %v", err)
76 }
77 defer ss.Stop()
78
79 testCases := []struct {
80 code codes.Code
81 count int
82 }{
83 {codes.OK, 0},
84 {codes.OK, 2},
85 {codes.OK, 5},
86 {codes.Internal, 6},
87 {codes.Internal, 8},
88 {codes.Internal, 11},
89 {codes.AlreadyExists, 15},
90 }
91 for num, tc := range testCases {
92 t.Log("Case", num)
93 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
94 _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
95 cancel()
96 if status.Code(err) != tc.code {
97 t.Fatalf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
98 }
99 if i != tc.count {
100 t.Fatalf("i = %v; want %v", i, tc.count)
101 }
102 }
103 }
104
105 func (s) TestRetryThrottling(t *testing.T) {
106 i := -1
107 ss := &stubserver.StubServer{
108 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
109 i++
110 switch i {
111 case 0, 3, 6, 10, 11, 12, 13, 14, 16, 18:
112 return &testpb.Empty{}, nil
113 }
114 return nil, status.New(codes.Unavailable, "retryable error").Err()
115 },
116 }
117 if err := ss.Start([]grpc.ServerOption{},
118 grpc.WithDefaultServiceConfig(`{
119 "methodConfig": [{
120 "name": [{"service": "grpc.testing.TestService"}],
121 "waitForReady": true,
122 "retryPolicy": {
123 "MaxAttempts": 4,
124 "InitialBackoff": ".01s",
125 "MaxBackoff": ".01s",
126 "BackoffMultiplier": 1.0,
127 "RetryableStatusCodes": [ "UNAVAILABLE" ]
128 }
129 }],
130 "retryThrottling": {
131 "maxTokens": 10,
132 "tokenRatio": 0.5
133 }
134 }`)); err != nil {
135 t.Fatalf("Error starting endpoint server: %v", err)
136 }
137 defer ss.Stop()
138
139 testCases := []struct {
140 code codes.Code
141 count int
142 }{
143 {codes.OK, 0},
144 {codes.OK, 3},
145 {codes.OK, 6},
146 {codes.Unavailable, 8},
147 {codes.Unavailable, 9},
148 {codes.OK, 10},
149 {codes.OK, 11},
150 {codes.OK, 12},
151 {codes.OK, 13},
152 {codes.OK, 14},
153 {codes.OK, 16},
154 {codes.Unavailable, 17},
155 }
156 for _, tc := range testCases {
157 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
158 _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
159 cancel()
160 if status.Code(err) != tc.code {
161 t.Errorf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
162 }
163 if i != tc.count {
164 t.Errorf("i = %v; want %v", i, tc.count)
165 }
166 }
167 }
168
169 func (s) TestRetryStreaming(t *testing.T) {
170 req := func(b byte) *testpb.StreamingOutputCallRequest {
171 return &testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: []byte{b}}}
172 }
173 res := func(b byte) *testpb.StreamingOutputCallResponse {
174 return &testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{Body: []byte{b}}}
175 }
176
177 largePayload, _ := newPayload(testpb.PayloadType_COMPRESSABLE, 500)
178
179 type serverOp func(stream testgrpc.TestService_FullDuplexCallServer) error
180 type clientOp func(stream testgrpc.TestService_FullDuplexCallClient) error
181
182
183 sAttempts := func(n int) serverOp {
184 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
185 const key = "grpc-previous-rpc-attempts"
186 md, ok := metadata.FromIncomingContext(stream.Context())
187 if !ok {
188 return status.Errorf(codes.Internal, "server: no header metadata received")
189 }
190 if got := md[key]; len(got) != 1 || got[0] != strconv.Itoa(n) {
191 return status.Errorf(codes.Internal, "server: metadata = %v; want <contains %q: %q>", md, key, n)
192 }
193 return nil
194 }
195 }
196 sReq := func(b byte) serverOp {
197 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
198 want := req(b)
199 if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
200 return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
201 }
202 return nil
203 }
204 }
205 sReqPayload := func(p *testpb.Payload) serverOp {
206 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
207 want := &testpb.StreamingOutputCallRequest{Payload: p}
208 if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
209 return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
210 }
211 return nil
212 }
213 }
214 sHdr := func() serverOp {
215 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
216 return stream.SendHeader(metadata.Pairs("test_header", "test_value"))
217 }
218 }
219 sRes := func(b byte) serverOp {
220 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
221 msg := res(b)
222 if err := stream.Send(msg); err != nil {
223 return status.Errorf(codes.Internal, "server: Send(%v) = %v; want <nil>", msg, err)
224 }
225 return nil
226 }
227 }
228 sErr := func(c codes.Code) serverOp {
229 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
230 return status.New(c, "this is a test error").Err()
231 }
232 }
233 sCloseSend := func() serverOp {
234 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
235 if msg, err := stream.Recv(); msg != nil || err != io.EOF {
236 return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want <nil>, io.EOF", msg, err)
237 }
238 return nil
239 }
240 }
241 sPushback := func(s string) serverOp {
242 return func(stream testgrpc.TestService_FullDuplexCallServer) error {
243 stream.SetTrailer(metadata.MD{"grpc-retry-pushback-ms": []string{s}})
244 return nil
245 }
246 }
247
248
249 cReq := func(b byte) clientOp {
250 return func(stream testgrpc.TestService_FullDuplexCallClient) error {
251 msg := req(b)
252 if err := stream.Send(msg); err != nil {
253 return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
254 }
255 return nil
256 }
257 }
258 cReqPayload := func(p *testpb.Payload) clientOp {
259 return func(stream testgrpc.TestService_FullDuplexCallClient) error {
260 msg := &testpb.StreamingOutputCallRequest{Payload: p}
261 if err := stream.Send(msg); err != nil {
262 return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
263 }
264 return nil
265 }
266 }
267 cRes := func(b byte) clientOp {
268 return func(stream testgrpc.TestService_FullDuplexCallClient) error {
269 want := res(b)
270 if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
271 return fmt.Errorf("client: Recv() = %v, %v; want %v, <nil>", got, err, want)
272 }
273 return nil
274 }
275 }
276 cErr := func(c codes.Code) clientOp {
277 return func(stream testgrpc.TestService_FullDuplexCallClient) error {
278 want := status.New(c, "this is a test error").Err()
279 if c == codes.OK {
280 want = io.EOF
281 }
282 res, err := stream.Recv()
283 if res != nil ||
284 ((err == nil) != (want == nil)) ||
285 (want != nil && err.Error() != want.Error()) {
286 return fmt.Errorf("client: Recv() = %v, %v; want <nil>, %v", res, err, want)
287 }
288 return nil
289 }
290 }
291 cCloseSend := func() clientOp {
292 return func(stream testgrpc.TestService_FullDuplexCallClient) error {
293 if err := stream.CloseSend(); err != nil {
294 return fmt.Errorf("client: CloseSend() = %v; want <nil>", err)
295 }
296 return nil
297 }
298 }
299 var curTime time.Time
300 cGetTime := func() clientOp {
301 return func(_ testgrpc.TestService_FullDuplexCallClient) error {
302 curTime = time.Now()
303 return nil
304 }
305 }
306 cCheckElapsed := func(d time.Duration) clientOp {
307 return func(_ testgrpc.TestService_FullDuplexCallClient) error {
308 if elapsed := time.Since(curTime); elapsed < d {
309 return fmt.Errorf("elapsed time: %v; want >= %v", elapsed, d)
310 }
311 return nil
312 }
313 }
314 cHdr := func() clientOp {
315 return func(stream testgrpc.TestService_FullDuplexCallClient) error {
316 _, err := stream.Header()
317 if err == io.EOF {
318
319
320 err = nil
321 }
322 return err
323 }
324 }
325 cCtx := func() clientOp {
326 return func(stream testgrpc.TestService_FullDuplexCallClient) error {
327 stream.Context()
328 return nil
329 }
330 }
331
332 testCases := []struct {
333 desc string
334 serverOps []serverOp
335 clientOps []clientOp
336 }{{
337 desc: "Non-retryable error code",
338 serverOps: []serverOp{sReq(1), sErr(codes.Internal)},
339 clientOps: []clientOp{cReq(1), cErr(codes.Internal)},
340 }, {
341 desc: "One retry necessary",
342 serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1), sRes(1)},
343 clientOps: []clientOp{cReq(1), cRes(1), cErr(codes.OK)},
344 }, {
345 desc: "Exceed max attempts (4); check attempts header on server",
346 serverOps: []serverOp{
347 sReq(1), sErr(codes.Unavailable),
348 sReq(1), sAttempts(1), sErr(codes.Unavailable),
349 sAttempts(2), sReq(1), sErr(codes.Unavailable),
350 sAttempts(3), sReq(1), sErr(codes.Unavailable),
351 },
352 clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
353 }, {
354 desc: "Multiple requests",
355 serverOps: []serverOp{
356 sReq(1), sReq(2), sErr(codes.Unavailable),
357 sReq(1), sReq(2), sRes(5),
358 },
359 clientOps: []clientOp{cReq(1), cReq(2), cRes(5), cErr(codes.OK)},
360 }, {
361 desc: "Multiple successive requests",
362 serverOps: []serverOp{
363 sReq(1), sErr(codes.Unavailable),
364 sReq(1), sReq(2), sErr(codes.Unavailable),
365 sReq(1), sReq(2), sReq(3), sRes(5),
366 },
367 clientOps: []clientOp{cReq(1), cReq(2), cReq(3), cRes(5), cErr(codes.OK)},
368 }, {
369 desc: "No retry after receiving",
370 serverOps: []serverOp{
371 sReq(1), sErr(codes.Unavailable),
372 sReq(1), sRes(3), sErr(codes.Unavailable),
373 },
374 clientOps: []clientOp{cReq(1), cRes(3), cErr(codes.Unavailable)},
375 }, {
376 desc: "Retry via ClientStream.Header()",
377 serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1)},
378 clientOps: []clientOp{cReq(1), cHdr() , cErr(codes.OK)},
379 }, {
380 desc: "No retry after header",
381 serverOps: []serverOp{sReq(1), sHdr(), sErr(codes.Unavailable)},
382 clientOps: []clientOp{cReq(1), cHdr(), cErr(codes.Unavailable)},
383 }, {
384 desc: "No retry after context",
385 serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
386 clientOps: []clientOp{cReq(1), cCtx(), cErr(codes.Unavailable)},
387 }, {
388 desc: "Replaying close send",
389 serverOps: []serverOp{
390 sReq(1), sReq(2), sCloseSend(), sErr(codes.Unavailable),
391 sReq(1), sReq(2), sCloseSend(), sRes(1), sRes(3), sRes(5),
392 },
393 clientOps: []clientOp{cReq(1), cReq(2), cCloseSend(), cRes(1), cRes(3), cRes(5), cErr(codes.OK)},
394 }, {
395 desc: "Negative server pushback - no retry",
396 serverOps: []serverOp{sReq(1), sPushback("-1"), sErr(codes.Unavailable)},
397 clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
398 }, {
399 desc: "Non-numeric server pushback - no retry",
400 serverOps: []serverOp{sReq(1), sPushback("xxx"), sErr(codes.Unavailable)},
401 clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
402 }, {
403 desc: "Multiple server pushback values - no retry",
404 serverOps: []serverOp{sReq(1), sPushback("100"), sPushback("10"), sErr(codes.Unavailable)},
405 clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
406 }, {
407 desc: "1s server pushback - delayed retry",
408 serverOps: []serverOp{sReq(1), sPushback("1000"), sErr(codes.Unavailable), sReq(1), sRes(2)},
409 clientOps: []clientOp{cGetTime(), cReq(1), cRes(2), cCheckElapsed(time.Second), cErr(codes.OK)},
410 }, {
411 desc: "Overflowing buffer - no retry",
412 serverOps: []serverOp{sReqPayload(largePayload), sErr(codes.Unavailable)},
413 clientOps: []clientOp{cReqPayload(largePayload), cErr(codes.Unavailable)},
414 }}
415
416 var serverOpIter int
417 var serverOps []serverOp
418 ss := &stubserver.StubServer{
419 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
420 for serverOpIter < len(serverOps) {
421 op := serverOps[serverOpIter]
422 serverOpIter++
423 if err := op(stream); err != nil {
424 return err
425 }
426 }
427 return nil
428 },
429 }
430 if err := ss.Start([]grpc.ServerOption{}, grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(200)),
431 grpc.WithDefaultServiceConfig(`{
432 "methodConfig": [{
433 "name": [{"service": "grpc.testing.TestService"}],
434 "waitForReady": true,
435 "retryPolicy": {
436 "MaxAttempts": 4,
437 "InitialBackoff": ".01s",
438 "MaxBackoff": ".01s",
439 "BackoffMultiplier": 1.0,
440 "RetryableStatusCodes": [ "UNAVAILABLE" ]
441 }
442 }]}`)); err != nil {
443 t.Fatalf("Error starting endpoint server: %v", err)
444 }
445 defer ss.Stop()
446 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
447 defer cancel()
448 for {
449 if ctx.Err() != nil {
450 t.Fatalf("Timed out waiting for service config update")
451 }
452 if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
453 break
454 }
455 time.Sleep(time.Millisecond)
456 }
457
458 for _, tc := range testCases {
459 func() {
460 serverOpIter = 0
461 serverOps = tc.serverOps
462
463 stream, err := ss.Client.FullDuplexCall(ctx)
464 if err != nil {
465 t.Fatalf("%v: Error while creating stream: %v", tc.desc, err)
466 }
467 for _, op := range tc.clientOps {
468 if err := op(stream); err != nil {
469 t.Errorf("%v: %v", tc.desc, err)
470 break
471 }
472 }
473 if serverOpIter != len(serverOps) {
474 t.Errorf("%v: serverOpIter = %v; want %v", tc.desc, serverOpIter, len(serverOps))
475 }
476 }()
477 }
478 }
479
480 type retryStatsHandler struct {
481 mu sync.Mutex
482 s []stats.RPCStats
483 }
484
485 func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
486 return ctx
487 }
488 func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
489
490 if _, ok := s.(*stats.PickerUpdated); ok {
491 return
492 }
493 h.mu.Lock()
494 h.s = append(h.s, s)
495 h.mu.Unlock()
496 }
497 func (*retryStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
498 return ctx
499 }
500 func (*retryStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
501
502 func (s) TestRetryStats(t *testing.T) {
503 lis, err := net.Listen("tcp", "localhost:0")
504 if err != nil {
505 t.Fatalf("Failed to listen. Err: %v", err)
506 }
507 defer lis.Close()
508 server := &httpServer{
509 waitForEndStream: true,
510 responses: []httpServerResponse{{
511 trailers: [][]string{{
512 ":status", "200",
513 "content-type", "application/grpc",
514 "grpc-status", "14",
515 "grpc-message", "unavailable retry",
516 "grpc-retry-pushback-ms", "10",
517 }},
518 }, {
519 headers: [][]string{{
520 ":status", "200",
521 "content-type", "application/grpc",
522 }},
523 payload: []byte{0, 0, 0, 0, 0},
524 trailers: [][]string{{
525 "grpc-status", "0",
526 }},
527 }},
528 refuseStream: func(i uint32) bool {
529 return i == 1
530 },
531 }
532 server.start(t, lis)
533 handler := &retryStatsHandler{}
534 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(handler),
535 grpc.WithDefaultServiceConfig((`{
536 "methodConfig": [{
537 "name": [{"service": "grpc.testing.TestService"}],
538 "retryPolicy": {
539 "MaxAttempts": 4,
540 "InitialBackoff": ".01s",
541 "MaxBackoff": ".01s",
542 "BackoffMultiplier": 1.0,
543 "RetryableStatusCodes": [ "UNAVAILABLE" ]
544 }
545 }]}`)))
546 if err != nil {
547 t.Fatalf("failed to dial due to err: %v", err)
548 }
549 defer cc.Close()
550
551 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
552 defer cancel()
553
554 client := testgrpc.NewTestServiceClient(cc)
555
556 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
557 t.Fatalf("unexpected EmptyCall error: %v", err)
558 }
559 handler.mu.Lock()
560 want := []stats.RPCStats{
561 &stats.Begin{},
562 &stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
563 &stats.OutPayload{WireLength: 5},
564 &stats.End{},
565
566 &stats.Begin{IsTransparentRetryAttempt: true},
567 &stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
568 &stats.OutPayload{WireLength: 5},
569 &stats.InTrailer{Trailer: metadata.Pairs("content-type", "application/grpc", "grpc-retry-pushback-ms", "10")},
570 &stats.End{},
571
572 &stats.Begin{},
573 &stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
574 &stats.OutPayload{WireLength: 5},
575 &stats.InHeader{},
576 &stats.InPayload{WireLength: 5},
577 &stats.InTrailer{},
578 &stats.End{},
579 }
580
581 toString := func(ss []stats.RPCStats) (ret []string) {
582 for _, s := range ss {
583 ret = append(ret, fmt.Sprintf("%T - %v", s, s))
584 }
585 return ret
586 }
587 t.Logf("Handler received frames:\n%v\n---\nwant:\n%v\n",
588 strings.Join(toString(handler.s), "\n"),
589 strings.Join(toString(want), "\n"))
590
591 if len(handler.s) != len(want) {
592 t.Fatalf("received unexpected number of RPCStats: got %v; want %v", len(handler.s), len(want))
593 }
594
595
596
597
598 const tIdx, pIdx = 13, 14
599 _, okT := handler.s[tIdx].(*stats.InTrailer)
600 _, okP := handler.s[pIdx].(*stats.InPayload)
601 if okT && okP {
602 handler.s[pIdx], handler.s[tIdx] = handler.s[tIdx], handler.s[pIdx]
603 }
604
605 for i := range handler.s {
606 w, s := want[i], handler.s[i]
607
608
609 if reflect.TypeOf(w) != reflect.TypeOf(s) {
610 t.Fatalf("at position %v: got %T; want %T", i, s, w)
611 }
612 wv, sv := reflect.ValueOf(w).Elem(), reflect.ValueOf(s).Elem()
613
614
615 if sv.FieldByName("Client").Interface().(bool) != true {
616 t.Fatalf("at position %v: got Client=false; want true", i)
617 }
618
619
620 for i := 0; i < wv.NumField(); i++ {
621 if !wv.Field(i).IsZero() {
622 if got, want := sv.Field(i).Interface(), wv.Field(i).Interface(); !reflect.DeepEqual(got, want) {
623 name := reflect.TypeOf(w).Elem().Field(i).Name
624 t.Fatalf("at position %v, field %v: got %v; want %v", i, name, got, want)
625 }
626 }
627 }
628
629
630
631 if wb, ok := w.(*stats.Begin); ok && !wb.IsTransparentRetryAttempt {
632 if s.(*stats.Begin).IsTransparentRetryAttempt {
633 t.Fatalf("at position %v: got IsTransparentRetryAttempt=true; want false", i)
634 }
635 }
636 }
637
638
639 end := handler.s[8].(*stats.End)
640 begin := handler.s[9].(*stats.Begin)
641 diff := begin.BeginTime.Sub(end.EndTime)
642 if diff < 10*time.Millisecond || diff > 50*time.Millisecond {
643 t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff)
644 }
645 }
646
647 func (s) TestRetryTransparentWhenCommitted(t *testing.T) {
648
649
650
651
652
653
654
655
656
657
658
659
660 first := grpcsync.NewEvent()
661 ss := &stubserver.StubServer{
662 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
663
664 if !first.HasFired() {
665 first.Fire()
666 t.Log("returned first error")
667 return status.Error(codes.AlreadyExists, "first attempt fails and is retriable")
668 }
669 t.Log("blocking")
670 <-stream.Context().Done()
671 return stream.Context().Err()
672 },
673 }
674
675 if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)},
676 grpc.WithDefaultServiceConfig(`{
677 "methodConfig": [{
678 "name": [{"service": "grpc.testing.TestService"}],
679 "waitForReady": true,
680 "retryPolicy": {
681 "MaxAttempts": 2,
682 "InitialBackoff": ".1s",
683 "MaxBackoff": ".1s",
684 "BackoffMultiplier": 1.0,
685 "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
686 }
687 }]}`)); err != nil {
688 t.Fatalf("Error starting endpoint server: %v", err)
689 }
690 defer ss.Stop()
691
692 ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTestTimeout)
693 defer cancel1()
694 ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTestTimeout)
695 defer cancel2()
696
697 stream1, err := ss.Client.FullDuplexCall(ctx1)
698 if err != nil {
699 t.Fatalf("Error creating stream 1: %v", err)
700 }
701
702
703 _, err = ss.Client.FullDuplexCall(ctx2)
704 if err != nil {
705 t.Errorf("Error creating stream 2: %v", err)
706 }
707
708 stream1Closed := grpcsync.NewEvent()
709 go func() {
710 _, err := stream1.Recv()
711
712 if status.Code(err) != codes.Canceled {
713 t.Errorf("Expected stream1 to be canceled; got error: %v", err)
714 }
715 stream1Closed.Fire()
716 }()
717
718
719 time.Sleep(200 * time.Millisecond)
720 cancel1()
721
722
723 <-stream1Closed.Done()
724 stream1.CloseSend()
725 stream1.Recv()
726 stream1.Send(&testpb.StreamingOutputCallRequest{})
727 }
728
View as plain text