...
1
18
19 package test
20
21 import (
22 "context"
23 "testing"
24 "time"
25
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/encoding/gzip"
29 "google.golang.org/grpc/internal/stubserver"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/status"
32
33 testgrpc "google.golang.org/grpc/interop/grpc_testing"
34 testpb "google.golang.org/grpc/interop/grpc_testing"
35 )
36
37 func (s) TestContextCanceled(t *testing.T) {
38 ss := &stubserver.StubServer{
39 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
40 stream.SetTrailer(metadata.New(map[string]string{"a": "b"}))
41 return status.Error(codes.PermissionDenied, "perm denied")
42 },
43 }
44 if err := ss.Start(nil); err != nil {
45 t.Fatalf("Error starting endpoint server: %v", err)
46 }
47 defer ss.Stop()
48
49
50
51 const cntRetry uint = 10
52 runTest := func(delay time.Duration) (cntCanceled, cntPermDenied uint) {
53 for i := uint(0); i < cntRetry; i++ {
54 ctx, cancel := context.WithTimeout(context.Background(), delay)
55 defer cancel()
56
57 str, err := ss.Client.FullDuplexCall(ctx)
58 if err != nil {
59 continue
60 }
61
62 _, err = str.Recv()
63 if err == nil {
64 t.Fatalf("non-nil error expected from Recv()")
65 }
66
67 _, trlOk := str.Trailer()["a"]
68 switch status.Code(err) {
69 case codes.PermissionDenied:
70 if !trlOk {
71 t.Fatalf(`status err: %v; wanted key "a" in trailer but didn't get it`, err)
72 }
73 cntPermDenied++
74 case codes.DeadlineExceeded:
75 if trlOk {
76 t.Fatalf(`status err: %v; didn't want key "a" in trailer but got it`, err)
77 }
78 cntCanceled++
79 default:
80 t.Fatalf(`unexpected status err: %v`, err)
81 }
82 }
83 return cntCanceled, cntPermDenied
84 }
85
86
87 canceledOk, permDeniedOk := false, false
88 for lower, upper := time.Duration(0), 2*time.Millisecond; lower <= upper; {
89 delay := lower + (upper-lower)/2
90 cntCanceled, cntPermDenied := runTest(delay)
91 if cntPermDenied > 0 && cntCanceled > 0 {
92
93 return
94 }
95
96
97 if cntCanceled > 0 {
98 canceledOk = true
99 }
100 if cntPermDenied > 0 {
101 permDeniedOk = true
102 }
103
104 if cntPermDenied == 0 {
105
106 lower += (upper-lower)/10 + 1
107 } else {
108
109 upper -= (upper-lower)/10 + 1
110 }
111 }
112
113 if !canceledOk || !permDeniedOk {
114 t.Fatalf(`couldn't find the delay that causes canceled/perm denied race.`)
115 }
116 }
117
118
119
120
121
122
123
124
125
126 func (s) TestCancelWhileRecvingWithCompression(t *testing.T) {
127 ss := &stubserver.StubServer{
128 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
129 for {
130 if err := stream.Send(&testpb.StreamingOutputCallResponse{
131 Payload: nil,
132 }); err != nil {
133 return err
134 }
135 }
136 },
137 }
138 if err := ss.Start(nil); err != nil {
139 t.Fatalf("Error starting endpoint server: %v", err)
140 }
141 defer ss.Stop()
142
143 for i := 0; i < 10; i++ {
144 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
145 s, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name))
146 if err != nil {
147 t.Fatalf("failed to start bidi streaming RPC: %v", err)
148 }
149
150 time.AfterFunc(time.Millisecond, cancel)
151 for {
152 _, err := s.Recv()
153 if err != nil {
154 if status.Code(err) != codes.Canceled {
155 t.Fatalf("recv failed with %v, want Canceled", err)
156 }
157 break
158 }
159 }
160 }
161 }
162
View as plain text