1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "testing"
23 "time"
24
25 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
26 "github.com/googleapis/gax-go/v2"
27 "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
28 statuspb "google.golang.org/genproto/googleapis/rpc/status"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/status"
32 )
33
34 func TestConnection_OpenWithRetry(t *testing.T) {
35
36 testCases := []struct {
37 desc string
38 errors []error
39 wantFail bool
40 }{
41 {
42 desc: "no error",
43 errors: []error{nil},
44 wantFail: false,
45 },
46 {
47 desc: "transient failures",
48 errors: []error{
49 status.Errorf(codes.Unavailable, "try 1"),
50 status.Errorf(codes.Unavailable, "try 2"),
51 nil},
52 wantFail: false,
53 },
54 {
55 desc: "terminal error",
56 errors: []error{status.Errorf(codes.InvalidArgument, "bad args")},
57 wantFail: true,
58 },
59 }
60
61 for _, tc := range testCases {
62 pool := &connectionPool{
63 ctx: context.Background(),
64 open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
65 if len(tc.errors) == 0 {
66 panic("out of errors")
67 }
68 err := tc.errors[0]
69 tc.errors = tc.errors[1:]
70 if err == nil {
71 return &testAppendRowsClient{}, nil
72 }
73 return nil, err
74 },
75 }
76 if err := pool.activateRouter(newSimpleRouter("")); err != nil {
77 t.Errorf("activateRouter: %v", err)
78 }
79 writer := &ManagedStream{id: "foo"}
80 if err := pool.addWriter(writer); err != nil {
81 t.Errorf("addWriter: %v", err)
82 }
83
84 conn, err := pool.router.pickConnection(nil)
85 if err != nil {
86 t.Errorf("case %s, failed to add connection: %v", tc.desc, err)
87 }
88 arc, ch, err := pool.openWithRetry(conn)
89 if tc.wantFail && err == nil {
90 t.Errorf("case %s: wanted failure, got success", tc.desc)
91 }
92 if !tc.wantFail && err != nil {
93 t.Errorf("case %s: wanted success, got %v", tc.desc, err)
94 }
95 if err == nil {
96 if arc == nil {
97 t.Errorf("case %s: expected append client, got nil", tc.desc)
98 }
99 if ch == nil {
100 t.Errorf("case %s: expected channel, got nil", tc.desc)
101 }
102 }
103 }
104 }
105
106
107
108 func TestConnection_LockingAppendFlowRelease(t *testing.T) {
109 ctx := context.Background()
110
111 pool := &connectionPool{
112 ctx: ctx,
113 baseFlowController: newFlowController(10, 0),
114 open: openTestArc(&testAppendRowsClient{},
115 func(req *storagepb.AppendRowsRequest) error {
116
117 return io.EOF
118 }, nil),
119 }
120 router := newSimpleRouter("")
121 if err := pool.activateRouter(router); err != nil {
122 t.Errorf("activateRouter: %v", err)
123 }
124
125 writer := &ManagedStream{id: "foo", ctx: ctx}
126 if err := pool.addWriter(writer); err != nil {
127 t.Errorf("addWriter: %v", err)
128 }
129
130 pw := newPendingWrite(ctx, writer, &storagepb.AppendRowsRequest{WriteStream: "somestream"}, newVersionedTemplate(), "", "")
131 for i := 0; i < 5; i++ {
132 conn, err := router.pool.selectConn(pw)
133 if err != nil {
134 t.Errorf("selectConn: %v", err)
135 }
136
137
138 if got := conn.fc.count(); got != 0 {
139 t.Errorf("attempt %d expected empty flow count, got %d", i, got)
140 }
141 if got := conn.fc.bytes(); got != 0 {
142 t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
143 }
144
145 if err := conn.lockingAppend(pw); err != io.EOF {
146 t.Errorf("lockingAppend attempt %d: expected io.EOF, got %v", i, err)
147 }
148
149 if got := conn.fc.count(); got != 0 {
150 t.Errorf("attempt %d expected empty flow count, got %d", i, got)
151 }
152 if got := conn.fc.bytes(); got != 0 {
153 t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
154 }
155 }
156 }
157
158
159
160 func TestConnection_LeakingReconnect(t *testing.T) {
161
162 ctx := context.Background()
163
164 pool := &connectionPool{
165 ctx: ctx,
166 baseFlowController: newFlowController(10, 0),
167 open: openTestArc(&testAppendRowsClient{},
168 func(req *storagepb.AppendRowsRequest) error {
169
170 return io.EOF
171 }, nil),
172 }
173 router := newSimpleRouter("")
174 if err := pool.activateRouter(router); err != nil {
175 t.Errorf("activateRouter: %v", err)
176 }
177 writer := &ManagedStream{id: "foo"}
178 if err := pool.addWriter(writer); err != nil {
179 t.Errorf("addWriter: %v", err)
180 }
181
182 var chans []chan *pendingWrite
183
184 for i := 0; i < 10; i++ {
185 _, ch, err := router.conn.getStream(nil, true)
186 if err != nil {
187 t.Fatalf("failed getStream(%d): %v", i, err)
188 }
189 chans = append(chans, ch)
190 }
191 var closedCount int
192 for _, ch := range chans {
193 select {
194 case _, ok := <-ch:
195 if !ok {
196 closedCount = closedCount + 1
197 }
198 case <-time.After(time.Second):
199
200 continue
201 }
202 }
203 if wantClosed := len(chans) - 1; wantClosed != closedCount {
204 t.Errorf("closed count mismatch, got %d want %d", closedCount, wantClosed)
205 }
206 }
207
208
209
210 func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) {
211 ctx, cancel := context.WithCancel(context.Background())
212 cancel()
213
214 pool := &connectionPool{
215 ctx: ctx,
216 cancel: cancel,
217 open: createOpenF(func(ctx context.Context, opts ...gax.CallOption) (storage.BigQueryWrite_AppendRowsClient, error) {
218 if len(opts) == 0 {
219 t.Fatalf("no options were propagated")
220 }
221 return nil, fmt.Errorf("no real client")
222 }, ""),
223 callOptions: []gax.CallOption{
224 gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)),
225 },
226 }
227 conn := newConnection(pool, "", nil)
228 pool.openWithRetry(conn)
229 }
230
231
232 func TestConnection_Receiver(t *testing.T) {
233
234 var customErr = fmt.Errorf("foo")
235
236 testCases := []struct {
237 description string
238 recvResp []*testRecvResponse
239 wantFinalErr error
240 wantTotalAttempts int
241 }{
242 {
243 description: "no errors",
244 recvResp: []*testRecvResponse{
245 {
246 resp: &storagepb.AppendRowsResponse{},
247 err: nil,
248 },
249 },
250 wantTotalAttempts: 1,
251 },
252 {
253 description: "recv err w/io.EOF",
254 recvResp: []*testRecvResponse{
255 {
256 resp: nil,
257 err: io.EOF,
258 },
259 {
260 resp: &storagepb.AppendRowsResponse{},
261 err: nil,
262 },
263 },
264 wantTotalAttempts: 2,
265 },
266 {
267 description: "recv err retried and then failed",
268 recvResp: []*testRecvResponse{
269 {
270 resp: nil,
271 err: io.EOF,
272 },
273 {
274 resp: nil,
275 err: customErr,
276 },
277 },
278 wantTotalAttempts: 2,
279 wantFinalErr: customErr,
280 },
281 {
282 description: "recv err w/ custom error",
283 recvResp: []*testRecvResponse{
284 {
285 resp: nil,
286 err: customErr,
287 },
288 {
289 resp: &storagepb.AppendRowsResponse{},
290 err: nil,
291 },
292 },
293 wantTotalAttempts: 1,
294 wantFinalErr: customErr,
295 },
296
297 {
298 description: "resp embeds Unavailable",
299 recvResp: []*testRecvResponse{
300 {
301 resp: &storagepb.AppendRowsResponse{
302 Response: &storagepb.AppendRowsResponse_Error{
303 Error: &statuspb.Status{
304 Code: int32(codes.Unavailable),
305 Message: "foo",
306 },
307 },
308 },
309 err: nil,
310 },
311 {
312 resp: &storagepb.AppendRowsResponse{},
313 err: nil,
314 },
315 },
316 wantTotalAttempts: 2,
317 },
318 {
319 description: "resp embeds generic ResourceExhausted",
320 recvResp: []*testRecvResponse{
321 {
322 resp: &storagepb.AppendRowsResponse{
323 Response: &storagepb.AppendRowsResponse_Error{
324 Error: &statuspb.Status{
325 Code: int32(codes.ResourceExhausted),
326 Message: "foo",
327 },
328 },
329 },
330 err: nil,
331 },
332 },
333 wantTotalAttempts: 1,
334 wantFinalErr: func() error {
335 return status.ErrorProto(&statuspb.Status{
336 Code: int32(codes.ResourceExhausted),
337 Message: "foo",
338 })
339 }(),
340 },
341 {
342 description: "resp embeds throughput ResourceExhausted",
343 recvResp: []*testRecvResponse{
344 {
345 resp: &storagepb.AppendRowsResponse{
346 Response: &storagepb.AppendRowsResponse_Error{
347 Error: &statuspb.Status{
348 Code: int32(codes.ResourceExhausted),
349 Message: "Exceeds 'AppendRows throughput' quota for stream blah",
350 },
351 },
352 },
353 err: nil,
354 },
355 {
356 resp: &storagepb.AppendRowsResponse{},
357 err: nil,
358 },
359 },
360 wantTotalAttempts: 2,
361 },
362 {
363 description: "retriable failures until max attempts",
364 recvResp: []*testRecvResponse{
365 {
366 err: io.EOF,
367 },
368 {
369 err: io.EOF,
370 },
371 {
372 err: io.EOF,
373 },
374 {
375 err: io.EOF,
376 },
377 },
378 wantTotalAttempts: 4,
379 wantFinalErr: io.EOF,
380 },
381 }
382
383 for _, tc := range testCases {
384 ctx, cancel := context.WithCancel(context.Background())
385
386 testArc := &testAppendRowsClient{
387 responses: tc.recvResp,
388 }
389
390 pool := &connectionPool{
391 ctx: ctx,
392 open: openTestArc(testArc, nil,
393 func() (*storagepb.AppendRowsResponse, error) {
394 if len(testArc.responses) == 0 {
395 panic("out of responses")
396 }
397 curResp := testArc.responses[0]
398 testArc.responses = testArc.responses[1:]
399 return curResp.resp, curResp.err
400 },
401 ),
402 baseFlowController: newFlowController(0, 0),
403 }
404 router := newSimpleRouter("")
405 if err := pool.activateRouter(router); err != nil {
406 t.Errorf("activateRouter: %v", err)
407 }
408
409 ms := &ManagedStream{
410 id: "foo",
411 ctx: ctx,
412 retry: newStatelessRetryer(),
413 }
414 if err := pool.addWriter(ms); err != nil {
415 t.Errorf("addWriter: %v", err)
416 }
417 conn := router.conn
418
419 _, ch, _ := pool.openWithRetry(conn)
420 pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
421 pw.writer = ms
422 pw.attemptCount = 1
423 ch <- pw
424
425
426 <-pw.result.Ready()
427
428
429 gotTotalAttempts, err := pw.result.TotalAttempts(ctx)
430 if err != nil {
431 t.Errorf("%s: failed to get total attempts: %v", tc.description, err)
432 }
433 if gotTotalAttempts != tc.wantTotalAttempts {
434 t.Errorf("%s: got %d total attempts, want %d attempts", tc.description, gotTotalAttempts, tc.wantTotalAttempts)
435 }
436
437
438 if gotFinalErr := pw.result.err; !errors.Is(gotFinalErr, tc.wantFinalErr) {
439 t.Errorf("%s: got final error %v, wanted final error %v", tc.description, gotFinalErr, tc.wantFinalErr)
440 }
441 cancel()
442 }
443 }
444
View as plain text