1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "errors"
20 "io"
21 "runtime"
22 "sync"
23 "testing"
24 "time"
25
26 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
27 "github.com/googleapis/gax-go/v2"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30 "google.golang.org/protobuf/types/descriptorpb"
31 )
32
33 type testRecvResponse struct {
34 resp *storagepb.AppendRowsResponse
35 err error
36 }
37
38 type testAppendRowsClient struct {
39 storagepb.BigQueryWrite_AppendRowsClient
40 openCount int
41 requests []*storagepb.AppendRowsRequest
42 responses []*testRecvResponse
43 sendF func(*storagepb.AppendRowsRequest) error
44 recvF func() (*storagepb.AppendRowsResponse, error)
45 closeF func() error
46 }
47
48 func (tarc *testAppendRowsClient) Send(req *storagepb.AppendRowsRequest) error {
49 return tarc.sendF(req)
50 }
51
52 func (tarc *testAppendRowsClient) Recv() (*storagepb.AppendRowsResponse, error) {
53 return tarc.recvF()
54 }
55
56 func (tarc *testAppendRowsClient) CloseSend() error {
57 return tarc.closeF()
58 }
59
60
61 func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
62 sF := func(req *storagepb.AppendRowsRequest) error {
63 testARC.requests = append(testARC.requests, req)
64 return nil
65 }
66 if sendF != nil {
67 sF = sendF
68 }
69 rF := func() (*storagepb.AppendRowsResponse, error) {
70 return &storagepb.AppendRowsResponse{
71 Response: &storagepb.AppendRowsResponse_AppendResult_{},
72 }, nil
73 }
74 if recvF != nil {
75 rF = recvF
76 }
77 testARC.sendF = sF
78 testARC.recvF = rF
79 testARC.closeF = func() error {
80 return nil
81 }
82 return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
83 testARC.openCount = testARC.openCount + 1
84
85 go func() {
86 <-ctx.Done()
87 }()
88 return testARC, nil
89 }
90 }
91
92 func TestManagedStream_RequestOptimization(t *testing.T) {
93
94 ctx := context.Background()
95 testARC := &testAppendRowsClient{}
96 pool := &connectionPool{
97 ctx: ctx,
98 open: openTestArc(testARC, nil, nil),
99 baseFlowController: newFlowController(0, 0),
100 }
101 if err := pool.activateRouter(newSimpleRouter("")); err != nil {
102 t.Errorf("activateRouter: %v", err)
103 }
104 ms := &ManagedStream{
105 id: "foo",
106 ctx: ctx,
107 streamSettings: defaultStreamSettings(),
108 }
109 if err := pool.addWriter(ms); err != nil {
110 t.Errorf("addWriter: %v", err)
111 }
112 ms.streamSettings.streamID = "FOO"
113 ms.streamSettings.TraceID = "TRACE"
114 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
115
116 fakeData := [][]byte{
117 []byte("foo"),
118 []byte("bar"),
119 }
120
121 wantReqs := 3
122
123 for i := 0; i < wantReqs; i++ {
124 _, err := ms.AppendRows(ctx, fakeData, WithOffset(int64(i)))
125 if err != nil {
126 t.Errorf("AppendRows; %v", err)
127 }
128 }
129
130 if testARC.openCount != 1 {
131 t.Errorf("expected a single open, got %d", testARC.openCount)
132 }
133
134 if len(testARC.requests) != wantReqs {
135 t.Errorf("expected %d requests, got %d", wantReqs, len(testARC.requests))
136 }
137
138 for k, v := range testARC.requests {
139 if v == nil {
140 t.Errorf("request %d was nil", k)
141 }
142 if v.GetOffset() == nil {
143 t.Errorf("request %d had no offset", k)
144 } else {
145 gotOffset := v.GetOffset().GetValue()
146 if gotOffset != int64(k) {
147 t.Errorf("request %d wanted offset %d, got %d", k, k, gotOffset)
148 }
149 }
150 if k == 0 {
151 if v.GetTraceId() == "" {
152 t.Errorf("expected TraceId on first request, was empty")
153 }
154 if v.GetWriteStream() == "" {
155 t.Errorf("expected WriteStream on first request, was empty")
156 }
157 if v.GetProtoRows().GetWriterSchema().GetProtoDescriptor() == nil {
158 t.Errorf("expected WriterSchema on first request, was empty")
159 }
160
161 } else {
162
163
164 }
165 }
166 }
167
168 func TestManagedStream_FlowControllerFailure(t *testing.T) {
169
170 ctx := context.Background()
171
172 pool := &connectionPool{
173 ctx: ctx,
174 open: openTestArc(&testAppendRowsClient{}, nil, nil),
175 baseFlowController: newFlowController(1, 0),
176 }
177 router := newSimpleRouter("")
178 if err := pool.activateRouter(router); err != nil {
179 t.Errorf("activateRouter: %v", err)
180 }
181
182 ms := &ManagedStream{
183 id: "foo",
184 ctx: ctx,
185 streamSettings: defaultStreamSettings(),
186 }
187 if err := pool.addWriter(ms); err != nil {
188 t.Errorf("addWritre: %v", err)
189 }
190
191
192 router.conn.fc = newFlowController(1, 0)
193 router.conn.fc.acquire(ctx, 0)
194
195 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
196
197 fakeData := [][]byte{
198 []byte("foo"),
199 []byte("bar"),
200 }
201
202
203
204
205 expireCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
206 defer cancel()
207 _, err := ms.AppendRows(expireCtx, fakeData)
208 if err == nil {
209 t.Errorf("expected AppendRows to error, but it succeeded")
210 }
211 }
212
213 func TestManagedStream_AppendWithDeadline(t *testing.T) {
214 ctx := context.Background()
215
216 pool := &connectionPool{
217 ctx: ctx,
218 baseFlowController: newFlowController(0, 0),
219 open: openTestArc(&testAppendRowsClient{},
220 func(req *storagepb.AppendRowsRequest) error {
221
222 time.Sleep(200 * time.Millisecond)
223 return nil
224 }, nil),
225 }
226 router := newSimpleRouter("")
227 if err := pool.activateRouter(router); err != nil {
228 t.Errorf("activateRouter: %v", err)
229 }
230
231 ms := &ManagedStream{
232 id: "foo",
233 ctx: ctx,
234 streamSettings: defaultStreamSettings(),
235 }
236 if err := pool.addWriter(ms); err != nil {
237 t.Errorf("addWriter: %v", err)
238 }
239 conn := router.conn
240 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
241
242 fakeData := [][]byte{
243 []byte("foo"),
244 }
245
246 wantCount := 0
247 if ct := conn.fc.count(); ct != wantCount {
248 t.Errorf("flowcontroller count mismatch, got %d want %d", ct, wantCount)
249 }
250
251
252
253 expireCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
254 defer cancel()
255
256 _, err := ms.AppendRows(expireCtx, fakeData)
257 if err == nil {
258 t.Errorf("expected AppendRows to error, but it succeeded")
259 }
260
261
262 wantCount = 1
263 if ct := conn.fc.count(); ct != wantCount {
264 t.Errorf("flowcontroller post-append count mismatch, got %d want %d", ct, wantCount)
265 }
266
267
268 time.Sleep(300 * time.Millisecond)
269 wantCount = 0
270 if ct := conn.fc.count(); ct != wantCount {
271 t.Errorf("flowcontroller post-append count mismatch, got %d want %d", ct, wantCount)
272 }
273 }
274
275 func TestManagedStream_ContextExpiry(t *testing.T) {
276
277
278 ctx := context.Background()
279
280 pool := &connectionPool{
281 ctx: ctx,
282 baseFlowController: newFlowController(0, 0),
283 open: openTestArc(&testAppendRowsClient{},
284 func(req *storagepb.AppendRowsRequest) error {
285 return nil
286 }, nil),
287 }
288 if err := pool.activateRouter(newSimpleRouter("")); err != nil {
289 t.Errorf("activateRouter: %v", err)
290 }
291
292 ms := &ManagedStream{
293 id: "foo",
294 ctx: ctx,
295 streamSettings: defaultStreamSettings(),
296 }
297 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
298 if err := pool.addWriter(ms); err != nil {
299 t.Errorf("addWriter: %v", err)
300 }
301
302 fakeData := [][]byte{
303 []byte("foo"),
304 }
305 fakeReq := &storagepb.AppendRowsRequest{
306 Rows: &storagepb.AppendRowsRequest_ProtoRows{
307 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
308 Rows: &storagepb.ProtoRows{
309 SerializedRows: fakeData,
310 },
311 },
312 },
313 }
314
315
316 cancelCtx, cancel := context.WithCancel(ctx)
317 cancel()
318
319
320 pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curTemplate, "", "")
321 err := ms.appendWithRetry(pw)
322 if err != context.Canceled {
323 t.Errorf("expected cancelled context error, got: %v", err)
324 }
325
326
327 _, err = ms.AppendRows(ctx, fakeData)
328 if err != nil {
329 t.Errorf("expected second append to succeed, but failed: %v", err)
330 }
331 }
332
333 func TestManagedStream_AppendDeadlocks(t *testing.T) {
334
335 testCases := []struct {
336 desc string
337 openErrors []error
338 ctx context.Context
339 respErr error
340 }{
341 {
342 desc: "no errors",
343 openErrors: []error{nil, nil},
344 ctx: context.Background(),
345 respErr: nil,
346 },
347 {
348 desc: "cancelled caller context",
349 openErrors: []error{nil, nil},
350 ctx: func() context.Context {
351 cctx, cancel := context.WithCancel(context.Background())
352 cancel()
353 return cctx
354 }(),
355 respErr: context.Canceled,
356 },
357 {
358 desc: "expired caller context",
359 openErrors: []error{nil, nil},
360 ctx: func() context.Context {
361 cctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
362 defer cancel()
363 time.Sleep(2 * time.Millisecond)
364 return cctx
365 }(),
366 respErr: context.DeadlineExceeded,
367 },
368 {
369 desc: "errored getstream",
370 openErrors: []error{status.Errorf(codes.ResourceExhausted, "some error"), status.Errorf(codes.ResourceExhausted, "some error")},
371 ctx: context.Background(),
372 respErr: status.Errorf(codes.ResourceExhausted, "some error"),
373 },
374 }
375
376 for _, tc := range testCases {
377 ctx := context.Background()
378 openF := openTestArc(&testAppendRowsClient{}, nil, nil)
379 pool := &connectionPool{
380 ctx: ctx,
381 open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
382 if len(tc.openErrors) == 0 {
383 panic("out of open errors")
384 }
385 curErr := tc.openErrors[0]
386 tc.openErrors = tc.openErrors[1:]
387 if curErr == nil {
388 return openF(ctx, opts...)
389 }
390 return nil, curErr
391 },
392 }
393 router := newSimpleRouter("")
394 if err := pool.activateRouter(router); err != nil {
395 t.Errorf("activateRouter: %v", err)
396 }
397 ms := &ManagedStream{
398 id: "foo",
399 streamSettings: &streamSettings{
400 streamID: "foo",
401 },
402 }
403 ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
404 if err := pool.addWriter(ms); err != nil {
405 t.Errorf("addWriter: %v", err)
406 }
407
408 testReq := ms.buildRequest([][]byte{[]byte("foo")})
409
410 pw := newPendingWrite(tc.ctx, ms, testReq, nil, "", "")
411 gotErr := ms.appendWithRetry(pw)
412 if !errors.Is(gotErr, tc.respErr) {
413 t.Errorf("%s first response: got %v, want %v", tc.desc, gotErr, tc.respErr)
414 }
415
416 pw = newPendingWrite(tc.ctx, ms, testReq, nil, "", "")
417 gotErr = ms.appendWithRetry(pw)
418 if !errors.Is(gotErr, tc.respErr) {
419 t.Errorf("%s second response: got %v, want %v", tc.desc, gotErr, tc.respErr)
420 }
421
422
423 ms.Close()
424 ms.Close()
425
426
427 gotErr = ms.appendWithRetry(pw)
428 if !errors.Is(gotErr, io.EOF) {
429 t.Errorf("expected io.EOF, got %v", gotErr)
430 }
431 gotErr = ms.appendWithRetry(pw)
432 if !errors.Is(gotErr, io.EOF) {
433 t.Errorf("expected io.EOF, got %v", gotErr)
434 }
435
436 }
437
438 }
439
440 func TestManagedStream_LeakingGoroutines(t *testing.T) {
441 ctx := context.Background()
442
443 pool := &connectionPool{
444 ctx: ctx,
445 open: openTestArc(&testAppendRowsClient{},
446 func(req *storagepb.AppendRowsRequest) error {
447
448 time.Sleep(40 * time.Millisecond)
449 return nil
450 }, nil),
451 baseFlowController: newFlowController(10, 0),
452 }
453 if err := pool.activateRouter(newSimpleRouter("")); err != nil {
454 t.Errorf("activateRouter: %v", err)
455 }
456 ms := &ManagedStream{
457 id: "foo",
458 ctx: ctx,
459 streamSettings: defaultStreamSettings(),
460 }
461 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
462 if err := pool.addWriter(ms); err != nil {
463 t.Errorf("addWriter: %v", err)
464 }
465
466 fakeData := [][]byte{
467 []byte("foo"),
468 }
469
470 threshold := runtime.NumGoroutine() + 20
471
472
473
474 for i := 0; i < 250; i++ {
475 expireCtx, cancel := context.WithTimeout(ctx, 25*time.Millisecond)
476 defer cancel()
477 ms.AppendRows(expireCtx, fakeData)
478 if i%50 == 0 {
479 if current := runtime.NumGoroutine(); current > threshold {
480 t.Errorf("potential goroutine leak, append %d: current %d, threshold %d", i, current, threshold)
481 }
482 }
483 }
484 }
485
486 func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) {
487 ctx := context.Background()
488
489 reqCount := 0
490 testArc := &testAppendRowsClient{}
491 pool := &connectionPool{
492 ctx: ctx,
493 open: openTestArc(testArc,
494 func(req *storagepb.AppendRowsRequest) error {
495 reqCount++
496 if reqCount%2 == 1 {
497 return io.EOF
498 }
499 return nil
500 }, nil),
501 baseFlowController: newFlowController(1000, 0),
502 }
503 if err := pool.activateRouter(newSimpleRouter("")); err != nil {
504 t.Errorf("activateRouter: %v", err)
505 }
506 ms := &ManagedStream{
507 id: "foo",
508 ctx: ctx,
509 streamSettings: defaultStreamSettings(),
510 retry: newStatelessRetryer(),
511 }
512 ms.retry.maxAttempts = 4
513 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
514 if err := pool.addWriter(ms); err != nil {
515 t.Errorf("addWriter: %v", err)
516 }
517
518 fakeData := [][]byte{
519 []byte("foo"),
520 }
521
522 threshold := runtime.NumGoroutine() + 5
523
524
525
526 for i := 0; i < 30; i++ {
527 writeCtx := context.Background()
528 r, err := ms.AppendRows(writeCtx, fakeData)
529 if err != nil {
530 t.Fatalf("failed to append row: %v", err)
531 }
532 _, err = r.GetResult(context.Background())
533 if err != nil {
534 t.Fatalf("failed to get result: %v", err)
535 }
536 if r.totalAttempts != 2 {
537 t.Fatalf("should trigger a retry, but found: %d attempts", r.totalAttempts)
538 }
539 if testArc.openCount != i+2 {
540 t.Errorf("should trigger a reconnect, but found openCount %d", testArc.openCount)
541 }
542 if i%10 == 0 {
543 if current := runtime.NumGoroutine(); current > threshold {
544 t.Errorf("potential goroutine leak, append %d: current %d, threshold %d", i, current, threshold)
545 }
546 }
547 }
548 }
549
550 func TestManagedWriter_CancellationDuringRetry(t *testing.T) {
551
552
553 ctx, cancel := context.WithCancel(context.Background())
554 pool := &connectionPool{
555 ctx: ctx,
556 open: openTestArc(&testAppendRowsClient{},
557 func(req *storagepb.AppendRowsRequest) error {
558
559 time.Sleep(time.Second)
560 return nil
561 },
562 func() (*storagepb.AppendRowsResponse, error) {
563
564 time.Sleep(2 * time.Second)
565 return nil, io.EOF
566 }),
567 baseFlowController: newFlowController(10, 0),
568 }
569 if err := pool.activateRouter(newSimpleRouter("")); err != nil {
570 t.Errorf("activateRouter: %v", err)
571 }
572
573 ms := &ManagedStream{
574 id: "foo",
575 ctx: ctx,
576 streamSettings: defaultStreamSettings(),
577 retry: newStatelessRetryer(),
578 }
579 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
580 if err := pool.addWriter(ms); err != nil {
581 t.Errorf("addWriter: %v", err)
582 }
583
584 fakeData := [][]byte{
585 []byte("foo"),
586 }
587
588 res, err := ms.AppendRows(context.Background(), fakeData)
589 if err != nil {
590 t.Errorf("AppendRows send err: %v", err)
591 }
592 cancel()
593
594 select {
595
596 case <-res.Ready():
597 if _, err := res.GetResult(context.Background()); err == nil {
598 t.Errorf("expected failure, got success")
599 }
600
601 case <-time.After(5 * time.Second):
602 t.Errorf("result was not ready in expected time")
603 }
604 }
605
606 func TestManagedStream_Closure(t *testing.T) {
607 ctx, cancel := context.WithCancel(context.Background())
608
609 pool := &connectionPool{
610 ctx: ctx,
611 cancel: cancel,
612 baseFlowController: newFlowController(0, 0),
613 open: openTestArc(&testAppendRowsClient{},
614 func(req *storagepb.AppendRowsRequest) error {
615 return nil
616 }, nil),
617 }
618 router := newSimpleRouter("")
619 if err := pool.activateRouter(router); err != nil {
620 t.Errorf("activateRouter: %v", err)
621 }
622
623 ms := &ManagedStream{
624 id: "foo",
625 streamSettings: defaultStreamSettings(),
626 }
627 ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
628 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
629 if err := pool.addWriter(ms); err != nil {
630 t.Errorf("addWriter A: %v", err)
631 }
632
633 if router.conn == nil {
634 t.Errorf("expected non-nil connection")
635 }
636
637 if err := ms.Close(); err != io.EOF {
638 t.Errorf("msB.Close, want %v got %v", io.EOF, err)
639 }
640 if router.conn != nil {
641 t.Errorf("expected nil connection")
642 }
643 if ms.ctx.Err() == nil {
644 t.Errorf("expected writer ctx to be dead, is alive")
645 }
646 }
647
648
649
650
651 func TestManagedStream_RaceFinder(t *testing.T) {
652 ctx, cancel := context.WithCancel(context.Background())
653
654 var totalsMu sync.Mutex
655 totalSends := 0
656 totalRecvs := 0
657 pool := &connectionPool{
658 ctx: ctx,
659 cancel: cancel,
660 baseFlowController: newFlowController(0, 0),
661 open: openTestArc(&testAppendRowsClient{},
662 func(req *storagepb.AppendRowsRequest) error {
663 totalsMu.Lock()
664 totalSends = totalSends + 1
665 curSends := totalSends
666 totalsMu.Unlock()
667 if curSends%25 == 0 {
668
669 return io.EOF
670 }
671 return nil
672 },
673 func() (*storagepb.AppendRowsResponse, error) {
674 totalsMu.Lock()
675 totalRecvs = totalRecvs + 1
676 curRecvs := totalRecvs
677 totalsMu.Unlock()
678 if curRecvs%15 == 0 {
679 return nil, io.EOF
680 }
681 return &storagepb.AppendRowsResponse{}, nil
682 }),
683 }
684 router := newSimpleRouter("")
685 if err := pool.activateRouter(router); err != nil {
686 t.Errorf("activateRouter: %v", err)
687 }
688
689 ms := &ManagedStream{
690 id: "foo",
691 streamSettings: defaultStreamSettings(),
692 retry: newStatelessRetryer(),
693 }
694 ms.retry.maxAttempts = 4
695 ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
696 ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
697 if err := pool.addWriter(ms); err != nil {
698 t.Errorf("addWriter A: %v", err)
699 }
700
701 if router.conn == nil {
702 t.Errorf("expected non-nil connection")
703 }
704
705 numWriters := 5
706 numWrites := 15
707
708 var wg sync.WaitGroup
709 wg.Add(numWriters)
710 for i := 0; i < numWriters; i++ {
711 go func() {
712 for j := 0; j < numWrites; j++ {
713 result, err := ms.AppendRows(ctx, [][]byte{[]byte("foo")})
714 if err != nil {
715 continue
716 }
717 _, err = result.GetResult(ctx)
718 if err != nil {
719 continue
720 }
721 }
722 wg.Done()
723 }()
724 }
725 wg.Wait()
726 cancel()
727 }
728
View as plain text