1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "testing"
23 "time"
24
25 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
26 gax "github.com/googleapis/gax-go/v2"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29 )
30
31 func TestStorageIteratorRetry(t *testing.T) {
32 cancelledCtx, cancel := context.WithCancel(context.Background())
33 cancel()
34 testCases := []struct {
35 ctx context.Context
36 desc string
37 errors []error
38 wantFail bool
39 }{
40 {
41 desc: "no error",
42 errors: []error{},
43 wantFail: false,
44 },
45 {
46 desc: "transient failures",
47 errors: []error{
48 status.Errorf(codes.DeadlineExceeded, "try 1"),
49 status.Errorf(codes.Unavailable, "try 2"),
50 status.Errorf(codes.Canceled, "try 3"),
51 status.Errorf(codes.Internal, "try 4"),
52 },
53 wantFail: false,
54 },
55 {
56 desc: "not enough permission",
57 errors: []error{
58 status.Errorf(codes.PermissionDenied, "the user does not have 'bigquery.readsessions.getData' permission"),
59 },
60 wantFail: true,
61 },
62 {
63 desc: "permanent error",
64 errors: []error{
65 status.Errorf(codes.InvalidArgument, "invalid args"),
66 },
67 wantFail: true,
68 },
69 {
70 ctx: cancelledCtx,
71 desc: "context cancelled/deadline exceeded",
72 errors: []error{
73 fmt.Errorf("random error"),
74 fmt.Errorf("another random error"),
75 fmt.Errorf("yet another random error"),
76 },
77 wantFail: true,
78 },
79 }
80 for _, tc := range testCases {
81 rrc := &testReadRowsClient{
82 errors: tc.errors,
83 }
84 readRowsFuncs := map[string]func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error){
85 "readRows fail on first call": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
86 if len(tc.errors) == 0 {
87 return &testReadRowsClient{}, nil
88 }
89 err := tc.errors[0]
90 tc.errors = tc.errors[1:]
91 if err != nil {
92 return nil, err
93 }
94 return &testReadRowsClient{}, nil
95 },
96 "readRows fails on Recv": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
97 return rrc, nil
98 },
99 }
100 for readRowsFuncType, readRowsFunc := range readRowsFuncs {
101 baseCtx := tc.ctx
102 if baseCtx == nil {
103 baseCtx = context.Background()
104 }
105 ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
106 defer cancel()
107
108 it, err := newRawStorageRowIterator(&readSession{
109 ctx: ctx,
110 settings: defaultReadClientSettings(),
111 readRowsFunc: readRowsFunc,
112 bqSession: &storagepb.ReadSession{},
113 }, Schema{})
114 if err != nil {
115 t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
116 }
117
118 it.processStream("test-stream")
119
120 if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) {
121 if tc.wantFail {
122 continue
123 }
124 t.Fatalf("case %s(%s): deadline exceeded", tc.desc, readRowsFuncType)
125 }
126 if tc.wantFail && len(it.errs) == 0 {
127 t.Fatalf("case %s(%s):want test to fail, but found no errors", tc.desc, readRowsFuncType)
128 }
129 if !tc.wantFail && len(it.errs) > 0 {
130 t.Fatalf("case %s(%s):test should not fail, but found %d errors", tc.desc, readRowsFuncType, len(it.errs))
131 }
132 }
133 }
134 }
135
136 type testReadRowsClient struct {
137 storagepb.BigQueryRead_ReadRowsClient
138 responses []*storagepb.ReadRowsResponse
139 errors []error
140 }
141
142 func (trrc *testReadRowsClient) Recv() (*storagepb.ReadRowsResponse, error) {
143 if len(trrc.errors) > 0 {
144 err := trrc.errors[0]
145 trrc.errors = trrc.errors[1:]
146 return nil, err
147 }
148 if len(trrc.responses) > 0 {
149 r := trrc.responses[0]
150 trrc.responses = trrc.responses[:1]
151 return r, nil
152 }
153 return nil, io.EOF
154 }
155
View as plain text