1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package storage
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "math"
25 "net/http"
26 "net/url"
27 "time"
28
29 storagepb "cloud.google.com/go/bigquery/storage/apiv1beta2/storagepb"
30 gax "github.com/googleapis/gax-go/v2"
31 "google.golang.org/api/googleapi"
32 "google.golang.org/api/option"
33 "google.golang.org/api/option/internaloption"
34 gtransport "google.golang.org/api/transport/grpc"
35 httptransport "google.golang.org/api/transport/http"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/protobuf/encoding/protojson"
40 )
41
42 var newBigQueryReadClientHook clientHook
43
44
45 type BigQueryReadCallOptions struct {
46 CreateReadSession []gax.CallOption
47 ReadRows []gax.CallOption
48 SplitReadStream []gax.CallOption
49 }
50
51 func defaultBigQueryReadGRPCClientOptions() []option.ClientOption {
52 return []option.ClientOption{
53 internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
54 internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
55 internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
56 internaloption.WithDefaultUniverseDomain("googleapis.com"),
57 internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
58 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
59 internaloption.EnableJwtWithScope(),
60 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
61 grpc.MaxCallRecvMsgSize(math.MaxInt32))),
62 }
63 }
64
65 func defaultBigQueryReadCallOptions() *BigQueryReadCallOptions {
66 return &BigQueryReadCallOptions{
67 CreateReadSession: []gax.CallOption{
68 gax.WithTimeout(600000 * time.Millisecond),
69 gax.WithRetry(func() gax.Retryer {
70 return gax.OnCodes([]codes.Code{
71 codes.DeadlineExceeded,
72 codes.Unavailable,
73 }, gax.Backoff{
74 Initial: 100 * time.Millisecond,
75 Max: 60000 * time.Millisecond,
76 Multiplier: 1.30,
77 })
78 }),
79 },
80 ReadRows: []gax.CallOption{
81 gax.WithRetry(func() gax.Retryer {
82 return gax.OnCodes([]codes.Code{
83 codes.Unavailable,
84 }, gax.Backoff{
85 Initial: 100 * time.Millisecond,
86 Max: 60000 * time.Millisecond,
87 Multiplier: 1.30,
88 })
89 }),
90 },
91 SplitReadStream: []gax.CallOption{
92 gax.WithTimeout(600000 * time.Millisecond),
93 gax.WithRetry(func() gax.Retryer {
94 return gax.OnCodes([]codes.Code{
95 codes.DeadlineExceeded,
96 codes.Unavailable,
97 }, gax.Backoff{
98 Initial: 100 * time.Millisecond,
99 Max: 60000 * time.Millisecond,
100 Multiplier: 1.30,
101 })
102 }),
103 },
104 }
105 }
106
107 func defaultBigQueryReadRESTCallOptions() *BigQueryReadCallOptions {
108 return &BigQueryReadCallOptions{
109 CreateReadSession: []gax.CallOption{
110 gax.WithTimeout(600000 * time.Millisecond),
111 gax.WithRetry(func() gax.Retryer {
112 return gax.OnHTTPCodes(gax.Backoff{
113 Initial: 100 * time.Millisecond,
114 Max: 60000 * time.Millisecond,
115 Multiplier: 1.30,
116 },
117 http.StatusGatewayTimeout,
118 http.StatusServiceUnavailable)
119 }),
120 },
121 ReadRows: []gax.CallOption{
122 gax.WithTimeout(86400000 * time.Millisecond),
123 gax.WithRetry(func() gax.Retryer {
124 return gax.OnHTTPCodes(gax.Backoff{
125 Initial: 100 * time.Millisecond,
126 Max: 60000 * time.Millisecond,
127 Multiplier: 1.30,
128 },
129 http.StatusServiceUnavailable)
130 }),
131 },
132 SplitReadStream: []gax.CallOption{
133 gax.WithTimeout(600000 * time.Millisecond),
134 gax.WithRetry(func() gax.Retryer {
135 return gax.OnHTTPCodes(gax.Backoff{
136 Initial: 100 * time.Millisecond,
137 Max: 60000 * time.Millisecond,
138 Multiplier: 1.30,
139 },
140 http.StatusGatewayTimeout,
141 http.StatusServiceUnavailable)
142 }),
143 },
144 }
145 }
146
147
148 type internalBigQueryReadClient interface {
149 Close() error
150 setGoogleClientInfo(...string)
151 Connection() *grpc.ClientConn
152 CreateReadSession(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
153 ReadRows(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error)
154 SplitReadStream(context.Context, *storagepb.SplitReadStreamRequest, ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error)
155 }
156
157
158
159
160
161
162
163
164
165
166 type BigQueryReadClient struct {
167
168 internalClient internalBigQueryReadClient
169
170
171 CallOptions *BigQueryReadCallOptions
172 }
173
174
175
176
177
178 func (c *BigQueryReadClient) Close() error {
179 return c.internalClient.Close()
180 }
181
182
183
184
185 func (c *BigQueryReadClient) setGoogleClientInfo(keyval ...string) {
186 c.internalClient.setGoogleClientInfo(keyval...)
187 }
188
189
190
191
192
193 func (c *BigQueryReadClient) Connection() *grpc.ClientConn {
194 return c.internalClient.Connection()
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 func (c *BigQueryReadClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
217 return c.internalClient.CreateReadSession(ctx, req, opts...)
218 }
219
220
221
222
223
224
225
226
227 func (c *BigQueryReadClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
228 return c.internalClient.ReadRows(ctx, req, opts...)
229 }
230
231
232
233
234
235
236
237
238
239
240
241
242
243 func (c *BigQueryReadClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
244 return c.internalClient.SplitReadStream(ctx, req, opts...)
245 }
246
247
248
249
250 type bigQueryReadGRPCClient struct {
251
252 connPool gtransport.ConnPool
253
254
255 CallOptions **BigQueryReadCallOptions
256
257
258 bigQueryReadClient storagepb.BigQueryReadClient
259
260
261 xGoogHeaders []string
262 }
263
264
265
266
267
268
269
270
271
272
273 func NewBigQueryReadClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryReadClient, error) {
274 clientOpts := defaultBigQueryReadGRPCClientOptions()
275 if newBigQueryReadClientHook != nil {
276 hookOpts, err := newBigQueryReadClientHook(ctx, clientHookParams{})
277 if err != nil {
278 return nil, err
279 }
280 clientOpts = append(clientOpts, hookOpts...)
281 }
282
283 connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
284 if err != nil {
285 return nil, err
286 }
287 client := BigQueryReadClient{CallOptions: defaultBigQueryReadCallOptions()}
288
289 c := &bigQueryReadGRPCClient{
290 connPool: connPool,
291 bigQueryReadClient: storagepb.NewBigQueryReadClient(connPool),
292 CallOptions: &client.CallOptions,
293 }
294 c.setGoogleClientInfo()
295
296 client.internalClient = c
297
298 return &client, nil
299 }
300
301
302
303
304
305 func (c *bigQueryReadGRPCClient) Connection() *grpc.ClientConn {
306 return c.connPool.Conn()
307 }
308
309
310
311
312 func (c *bigQueryReadGRPCClient) setGoogleClientInfo(keyval ...string) {
313 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
314 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
315 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
316 }
317
318
319
320 func (c *bigQueryReadGRPCClient) Close() error {
321 return c.connPool.Close()
322 }
323
324
325 type bigQueryReadRESTClient struct {
326
327 endpoint string
328
329
330 httpClient *http.Client
331
332
333 xGoogHeaders []string
334
335
336 CallOptions **BigQueryReadCallOptions
337 }
338
339
340
341
342
343
344
345
346
347 func NewBigQueryReadRESTClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryReadClient, error) {
348 clientOpts := append(defaultBigQueryReadRESTClientOptions(), opts...)
349 httpClient, endpoint, err := httptransport.NewClient(ctx, clientOpts...)
350 if err != nil {
351 return nil, err
352 }
353
354 callOpts := defaultBigQueryReadRESTCallOptions()
355 c := &bigQueryReadRESTClient{
356 endpoint: endpoint,
357 httpClient: httpClient,
358 CallOptions: &callOpts,
359 }
360 c.setGoogleClientInfo()
361
362 return &BigQueryReadClient{internalClient: c, CallOptions: callOpts}, nil
363 }
364
365 func defaultBigQueryReadRESTClientOptions() []option.ClientOption {
366 return []option.ClientOption{
367 internaloption.WithDefaultEndpoint("https://bigquerystorage.googleapis.com"),
368 internaloption.WithDefaultEndpointTemplate("https://bigquerystorage.UNIVERSE_DOMAIN"),
369 internaloption.WithDefaultMTLSEndpoint("https://bigquerystorage.mtls.googleapis.com"),
370 internaloption.WithDefaultUniverseDomain("googleapis.com"),
371 internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
372 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
373 }
374 }
375
376
377
378
379 func (c *bigQueryReadRESTClient) setGoogleClientInfo(keyval ...string) {
380 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
381 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "rest", "UNKNOWN")
382 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
383 }
384
385
386
387 func (c *bigQueryReadRESTClient) Close() error {
388
389 c.httpClient = nil
390 return nil
391 }
392
393
394
395
396 func (c *bigQueryReadRESTClient) Connection() *grpc.ClientConn {
397 return nil
398 }
399 func (c *bigQueryReadGRPCClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
400 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_session.table", url.QueryEscape(req.GetReadSession().GetTable()))}
401
402 hds = append(c.xGoogHeaders, hds...)
403 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
404 opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
405 var resp *storagepb.ReadSession
406 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
407 var err error
408 resp, err = c.bigQueryReadClient.CreateReadSession(ctx, req, settings.GRPC...)
409 return err
410 }, opts...)
411 if err != nil {
412 return nil, err
413 }
414 return resp, nil
415 }
416
417 func (c *bigQueryReadGRPCClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
418 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_stream", url.QueryEscape(req.GetReadStream()))}
419
420 hds = append(c.xGoogHeaders, hds...)
421 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
422 opts = append((*c.CallOptions).ReadRows[0:len((*c.CallOptions).ReadRows):len((*c.CallOptions).ReadRows)], opts...)
423 var resp storagepb.BigQueryRead_ReadRowsClient
424 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
425 var err error
426 resp, err = c.bigQueryReadClient.ReadRows(ctx, req, settings.GRPC...)
427 return err
428 }, opts...)
429 if err != nil {
430 return nil, err
431 }
432 return resp, nil
433 }
434
435 func (c *bigQueryReadGRPCClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
436 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
437
438 hds = append(c.xGoogHeaders, hds...)
439 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
440 opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
441 var resp *storagepb.SplitReadStreamResponse
442 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
443 var err error
444 resp, err = c.bigQueryReadClient.SplitReadStream(ctx, req, settings.GRPC...)
445 return err
446 }, opts...)
447 if err != nil {
448 return nil, err
449 }
450 return resp, nil
451 }
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472 func (c *bigQueryReadRESTClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
473 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
474 jsonReq, err := m.Marshal(req)
475 if err != nil {
476 return nil, err
477 }
478
479 baseUrl, err := url.Parse(c.endpoint)
480 if err != nil {
481 return nil, err
482 }
483 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetReadSession().GetTable())
484
485
486 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_session.table", url.QueryEscape(req.GetReadSession().GetTable()))}
487
488 hds = append(c.xGoogHeaders, hds...)
489 hds = append(hds, "Content-Type", "application/json")
490 headers := gax.BuildHeaders(ctx, hds...)
491 opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
492 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
493 resp := &storagepb.ReadSession{}
494 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
495 if settings.Path != "" {
496 baseUrl.Path = settings.Path
497 }
498 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
499 if err != nil {
500 return err
501 }
502 httpReq = httpReq.WithContext(ctx)
503 httpReq.Header = headers
504
505 httpRsp, err := c.httpClient.Do(httpReq)
506 if err != nil {
507 return err
508 }
509 defer httpRsp.Body.Close()
510
511 if err = googleapi.CheckResponse(httpRsp); err != nil {
512 return err
513 }
514
515 buf, err := io.ReadAll(httpRsp.Body)
516 if err != nil {
517 return err
518 }
519
520 if err := unm.Unmarshal(buf, resp); err != nil {
521 return err
522 }
523
524 return nil
525 }, opts...)
526 if e != nil {
527 return nil, e
528 }
529 return resp, nil
530 }
531
532
533
534
535
536
537
538
539 func (c *bigQueryReadRESTClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
540 baseUrl, err := url.Parse(c.endpoint)
541 if err != nil {
542 return nil, err
543 }
544 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetReadStream())
545
546 params := url.Values{}
547 if req.GetOffset() != 0 {
548 params.Add("offset", fmt.Sprintf("%v", req.GetOffset()))
549 }
550
551 baseUrl.RawQuery = params.Encode()
552
553
554 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_stream", url.QueryEscape(req.GetReadStream()))}
555
556 hds = append(c.xGoogHeaders, hds...)
557 hds = append(hds, "Content-Type", "application/json")
558 headers := gax.BuildHeaders(ctx, hds...)
559 var streamClient *readRowsRESTClient
560 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
561 if settings.Path != "" {
562 baseUrl.Path = settings.Path
563 }
564 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
565 if err != nil {
566 return err
567 }
568 httpReq = httpReq.WithContext(ctx)
569 httpReq.Header = headers
570
571 httpRsp, err := c.httpClient.Do(httpReq)
572 if err != nil {
573 return err
574 }
575
576 if err = googleapi.CheckResponse(httpRsp); err != nil {
577 return err
578 }
579
580 streamClient = &readRowsRESTClient{
581 ctx: ctx,
582 md: metadata.MD(httpRsp.Header),
583 stream: gax.NewProtoJSONStreamReader(httpRsp.Body, (&storagepb.ReadRowsResponse{}).ProtoReflect().Type()),
584 }
585 return nil
586 }, opts...)
587
588 return streamClient, e
589 }
590
591
592
593 type readRowsRESTClient struct {
594 ctx context.Context
595 md metadata.MD
596 stream *gax.ProtoJSONStream
597 }
598
599 func (c *readRowsRESTClient) Recv() (*storagepb.ReadRowsResponse, error) {
600 if err := c.ctx.Err(); err != nil {
601 defer c.stream.Close()
602 return nil, err
603 }
604 msg, err := c.stream.Recv()
605 if err != nil {
606 defer c.stream.Close()
607 return nil, err
608 }
609 res := msg.(*storagepb.ReadRowsResponse)
610 return res, nil
611 }
612
613 func (c *readRowsRESTClient) Header() (metadata.MD, error) {
614 return c.md, nil
615 }
616
617 func (c *readRowsRESTClient) Trailer() metadata.MD {
618 return c.md
619 }
620
621 func (c *readRowsRESTClient) CloseSend() error {
622
623 return fmt.Errorf("this method is not implemented for a server-stream")
624 }
625
626 func (c *readRowsRESTClient) Context() context.Context {
627 return c.ctx
628 }
629
630 func (c *readRowsRESTClient) SendMsg(m interface{}) error {
631
632 return fmt.Errorf("this method is not implemented for a server-stream")
633 }
634
635 func (c *readRowsRESTClient) RecvMsg(m interface{}) error {
636
637 return fmt.Errorf("this method is not implemented, use Recv")
638 }
639
640
641
642
643
644
645
646
647
648
649
650
651
652 func (c *bigQueryReadRESTClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
653 baseUrl, err := url.Parse(c.endpoint)
654 if err != nil {
655 return nil, err
656 }
657 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetName())
658
659 params := url.Values{}
660 if req.GetFraction() != 0 {
661 params.Add("fraction", fmt.Sprintf("%v", req.GetFraction()))
662 }
663
664 baseUrl.RawQuery = params.Encode()
665
666
667 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
668
669 hds = append(c.xGoogHeaders, hds...)
670 hds = append(hds, "Content-Type", "application/json")
671 headers := gax.BuildHeaders(ctx, hds...)
672 opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
673 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
674 resp := &storagepb.SplitReadStreamResponse{}
675 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
676 if settings.Path != "" {
677 baseUrl.Path = settings.Path
678 }
679 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
680 if err != nil {
681 return err
682 }
683 httpReq = httpReq.WithContext(ctx)
684 httpReq.Header = headers
685
686 httpRsp, err := c.httpClient.Do(httpReq)
687 if err != nil {
688 return err
689 }
690 defer httpRsp.Body.Close()
691
692 if err = googleapi.CheckResponse(httpRsp); err != nil {
693 return err
694 }
695
696 buf, err := io.ReadAll(httpRsp.Body)
697 if err != nil {
698 return err
699 }
700
701 if err := unm.Unmarshal(buf, resp); err != nil {
702 return err
703 }
704
705 return nil
706 }, opts...)
707 if e != nil {
708 return nil, e
709 }
710 return resp, nil
711 }
712
View as plain text