1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "net/url"
24 "time"
25
26 storagepb "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
27 gax "github.com/googleapis/gax-go/v2"
28 "google.golang.org/api/option"
29 "google.golang.org/api/option/internaloption"
30 gtransport "google.golang.org/api/transport/grpc"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/codes"
33 )
34
35 var newBigQueryReadClientHook clientHook
36
37
38 type BigQueryReadCallOptions struct {
39 CreateReadSession []gax.CallOption
40 ReadRows []gax.CallOption
41 SplitReadStream []gax.CallOption
42 }
43
44 func defaultBigQueryReadGRPCClientOptions() []option.ClientOption {
45 return []option.ClientOption{
46 internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
47 internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
48 internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
49 internaloption.WithDefaultUniverseDomain("googleapis.com"),
50 internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
51 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
52 internaloption.EnableJwtWithScope(),
53 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
54 grpc.MaxCallRecvMsgSize(math.MaxInt32))),
55 }
56 }
57
58 func defaultBigQueryReadCallOptions() *BigQueryReadCallOptions {
59 return &BigQueryReadCallOptions{
60 CreateReadSession: []gax.CallOption{
61 gax.WithTimeout(600000 * time.Millisecond),
62 gax.WithRetry(func() gax.Retryer {
63 return gax.OnCodes([]codes.Code{
64 codes.DeadlineExceeded,
65 codes.Unavailable,
66 }, gax.Backoff{
67 Initial: 100 * time.Millisecond,
68 Max: 60000 * time.Millisecond,
69 Multiplier: 1.30,
70 })
71 }),
72 },
73 ReadRows: []gax.CallOption{
74 gax.WithRetry(func() gax.Retryer {
75 return gax.OnCodes([]codes.Code{
76 codes.Unavailable,
77 }, gax.Backoff{
78 Initial: 100 * time.Millisecond,
79 Max: 60000 * time.Millisecond,
80 Multiplier: 1.30,
81 })
82 }),
83 },
84 SplitReadStream: []gax.CallOption{
85 gax.WithTimeout(600000 * time.Millisecond),
86 gax.WithRetry(func() gax.Retryer {
87 return gax.OnCodes([]codes.Code{
88 codes.DeadlineExceeded,
89 codes.Unavailable,
90 }, gax.Backoff{
91 Initial: 100 * time.Millisecond,
92 Max: 60000 * time.Millisecond,
93 Multiplier: 1.30,
94 })
95 }),
96 },
97 }
98 }
99
100
101 type internalBigQueryReadClient interface {
102 Close() error
103 setGoogleClientInfo(...string)
104 Connection() *grpc.ClientConn
105 CreateReadSession(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
106 ReadRows(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error)
107 SplitReadStream(context.Context, *storagepb.SplitReadStreamRequest, ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error)
108 }
109
110
111
112
113
114
115
116 type BigQueryReadClient struct {
117
118 internalClient internalBigQueryReadClient
119
120
121 CallOptions *BigQueryReadCallOptions
122 }
123
124
125
126
127
128 func (c *BigQueryReadClient) Close() error {
129 return c.internalClient.Close()
130 }
131
132
133
134
135 func (c *BigQueryReadClient) setGoogleClientInfo(keyval ...string) {
136 c.internalClient.setGoogleClientInfo(keyval...)
137 }
138
139
140
141
142
143 func (c *BigQueryReadClient) Connection() *grpc.ClientConn {
144 return c.internalClient.Connection()
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 func (c *BigQueryReadClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
167 return c.internalClient.CreateReadSession(ctx, req, opts...)
168 }
169
170
171
172
173
174
175
176
177 func (c *BigQueryReadClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
178 return c.internalClient.ReadRows(ctx, req, opts...)
179 }
180
181
182
183
184
185
186
187
188
189
190
191
192
193 func (c *BigQueryReadClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
194 return c.internalClient.SplitReadStream(ctx, req, opts...)
195 }
196
197
198
199
200 type bigQueryReadGRPCClient struct {
201
202 connPool gtransport.ConnPool
203
204
205 CallOptions **BigQueryReadCallOptions
206
207
208 bigQueryReadClient storagepb.BigQueryReadClient
209
210
211 xGoogHeaders []string
212 }
213
214
215
216
217
218
219
220 func NewBigQueryReadClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryReadClient, error) {
221 clientOpts := defaultBigQueryReadGRPCClientOptions()
222 if newBigQueryReadClientHook != nil {
223 hookOpts, err := newBigQueryReadClientHook(ctx, clientHookParams{})
224 if err != nil {
225 return nil, err
226 }
227 clientOpts = append(clientOpts, hookOpts...)
228 }
229
230 connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
231 if err != nil {
232 return nil, err
233 }
234 client := BigQueryReadClient{CallOptions: defaultBigQueryReadCallOptions()}
235
236 c := &bigQueryReadGRPCClient{
237 connPool: connPool,
238 bigQueryReadClient: storagepb.NewBigQueryReadClient(connPool),
239 CallOptions: &client.CallOptions,
240 }
241 c.setGoogleClientInfo()
242
243 client.internalClient = c
244
245 return &client, nil
246 }
247
248
249
250
251
252 func (c *bigQueryReadGRPCClient) Connection() *grpc.ClientConn {
253 return c.connPool.Conn()
254 }
255
256
257
258
259 func (c *bigQueryReadGRPCClient) setGoogleClientInfo(keyval ...string) {
260 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
261 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
262 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
263 }
264
265
266
267 func (c *bigQueryReadGRPCClient) Close() error {
268 return c.connPool.Close()
269 }
270
271 func (c *bigQueryReadGRPCClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
272 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_session.table", url.QueryEscape(req.GetReadSession().GetTable()))}
273
274 hds = append(c.xGoogHeaders, hds...)
275 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
276 opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
277 var resp *storagepb.ReadSession
278 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
279 var err error
280 resp, err = c.bigQueryReadClient.CreateReadSession(ctx, req, settings.GRPC...)
281 return err
282 }, opts...)
283 if err != nil {
284 return nil, err
285 }
286 return resp, nil
287 }
288
289 func (c *bigQueryReadGRPCClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
290 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_stream", url.QueryEscape(req.GetReadStream()))}
291
292 hds = append(c.xGoogHeaders, hds...)
293 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
294 opts = append((*c.CallOptions).ReadRows[0:len((*c.CallOptions).ReadRows):len((*c.CallOptions).ReadRows)], opts...)
295 var resp storagepb.BigQueryRead_ReadRowsClient
296 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
297 var err error
298 resp, err = c.bigQueryReadClient.ReadRows(ctx, req, settings.GRPC...)
299 return err
300 }, opts...)
301 if err != nil {
302 return nil, err
303 }
304 return resp, nil
305 }
306
307 func (c *bigQueryReadGRPCClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
308 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
309
310 hds = append(c.xGoogHeaders, hds...)
311 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
312 opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
313 var resp *storagepb.SplitReadStreamResponse
314 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
315 var err error
316 resp, err = c.bigQueryReadClient.SplitReadStream(ctx, req, settings.GRPC...)
317 return err
318 }, opts...)
319 if err != nil {
320 return nil, err
321 }
322 return resp, nil
323 }
324
View as plain text