1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package storage
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "math"
25 "net/http"
26 "net/url"
27 "time"
28
29 storagepb "cloud.google.com/go/bigquery/storage/apiv1beta1/storagepb"
30 gax "github.com/googleapis/gax-go/v2"
31 "google.golang.org/api/googleapi"
32 "google.golang.org/api/option"
33 "google.golang.org/api/option/internaloption"
34 gtransport "google.golang.org/api/transport/grpc"
35 httptransport "google.golang.org/api/transport/http"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/protobuf/encoding/protojson"
40 )
41
42 var newBigQueryStorageClientHook clientHook
43
44
45 type BigQueryStorageCallOptions struct {
46 CreateReadSession []gax.CallOption
47 ReadRows []gax.CallOption
48 BatchCreateReadSessionStreams []gax.CallOption
49 FinalizeStream []gax.CallOption
50 SplitReadStream []gax.CallOption
51 }
52
53 func defaultBigQueryStorageGRPCClientOptions() []option.ClientOption {
54 return []option.ClientOption{
55 internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
56 internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
57 internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
58 internaloption.WithDefaultUniverseDomain("googleapis.com"),
59 internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
60 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
61 internaloption.EnableJwtWithScope(),
62 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
63 grpc.MaxCallRecvMsgSize(math.MaxInt32))),
64 }
65 }
66
67 func defaultBigQueryStorageCallOptions() *BigQueryStorageCallOptions {
68 return &BigQueryStorageCallOptions{
69 CreateReadSession: []gax.CallOption{
70 gax.WithTimeout(600000 * time.Millisecond),
71 gax.WithRetry(func() gax.Retryer {
72 return gax.OnCodes([]codes.Code{
73 codes.DeadlineExceeded,
74 codes.Unavailable,
75 }, gax.Backoff{
76 Initial: 100 * time.Millisecond,
77 Max: 60000 * time.Millisecond,
78 Multiplier: 1.30,
79 })
80 }),
81 },
82 ReadRows: []gax.CallOption{
83 gax.WithRetry(func() gax.Retryer {
84 return gax.OnCodes([]codes.Code{
85 codes.Unavailable,
86 }, gax.Backoff{
87 Initial: 100 * time.Millisecond,
88 Max: 60000 * time.Millisecond,
89 Multiplier: 1.30,
90 })
91 }),
92 },
93 BatchCreateReadSessionStreams: []gax.CallOption{
94 gax.WithTimeout(600000 * time.Millisecond),
95 gax.WithRetry(func() gax.Retryer {
96 return gax.OnCodes([]codes.Code{
97 codes.DeadlineExceeded,
98 codes.Unavailable,
99 }, gax.Backoff{
100 Initial: 100 * time.Millisecond,
101 Max: 60000 * time.Millisecond,
102 Multiplier: 1.30,
103 })
104 }),
105 },
106 FinalizeStream: []gax.CallOption{
107 gax.WithTimeout(600000 * time.Millisecond),
108 gax.WithRetry(func() gax.Retryer {
109 return gax.OnCodes([]codes.Code{
110 codes.DeadlineExceeded,
111 codes.Unavailable,
112 }, gax.Backoff{
113 Initial: 100 * time.Millisecond,
114 Max: 60000 * time.Millisecond,
115 Multiplier: 1.30,
116 })
117 }),
118 },
119 SplitReadStream: []gax.CallOption{
120 gax.WithTimeout(600000 * time.Millisecond),
121 gax.WithRetry(func() gax.Retryer {
122 return gax.OnCodes([]codes.Code{
123 codes.DeadlineExceeded,
124 codes.Unavailable,
125 }, gax.Backoff{
126 Initial: 100 * time.Millisecond,
127 Max: 60000 * time.Millisecond,
128 Multiplier: 1.30,
129 })
130 }),
131 },
132 }
133 }
134
135 func defaultBigQueryStorageRESTCallOptions() *BigQueryStorageCallOptions {
136 return &BigQueryStorageCallOptions{
137 CreateReadSession: []gax.CallOption{
138 gax.WithTimeout(600000 * time.Millisecond),
139 gax.WithRetry(func() gax.Retryer {
140 return gax.OnHTTPCodes(gax.Backoff{
141 Initial: 100 * time.Millisecond,
142 Max: 60000 * time.Millisecond,
143 Multiplier: 1.30,
144 },
145 http.StatusGatewayTimeout,
146 http.StatusServiceUnavailable)
147 }),
148 },
149 ReadRows: []gax.CallOption{
150 gax.WithTimeout(86400000 * time.Millisecond),
151 gax.WithRetry(func() gax.Retryer {
152 return gax.OnHTTPCodes(gax.Backoff{
153 Initial: 100 * time.Millisecond,
154 Max: 60000 * time.Millisecond,
155 Multiplier: 1.30,
156 },
157 http.StatusServiceUnavailable)
158 }),
159 },
160 BatchCreateReadSessionStreams: []gax.CallOption{
161 gax.WithTimeout(600000 * time.Millisecond),
162 gax.WithRetry(func() gax.Retryer {
163 return gax.OnHTTPCodes(gax.Backoff{
164 Initial: 100 * time.Millisecond,
165 Max: 60000 * time.Millisecond,
166 Multiplier: 1.30,
167 },
168 http.StatusGatewayTimeout,
169 http.StatusServiceUnavailable)
170 }),
171 },
172 FinalizeStream: []gax.CallOption{
173 gax.WithTimeout(600000 * time.Millisecond),
174 gax.WithRetry(func() gax.Retryer {
175 return gax.OnHTTPCodes(gax.Backoff{
176 Initial: 100 * time.Millisecond,
177 Max: 60000 * time.Millisecond,
178 Multiplier: 1.30,
179 },
180 http.StatusGatewayTimeout,
181 http.StatusServiceUnavailable)
182 }),
183 },
184 SplitReadStream: []gax.CallOption{
185 gax.WithTimeout(600000 * time.Millisecond),
186 gax.WithRetry(func() gax.Retryer {
187 return gax.OnHTTPCodes(gax.Backoff{
188 Initial: 100 * time.Millisecond,
189 Max: 60000 * time.Millisecond,
190 Multiplier: 1.30,
191 },
192 http.StatusGatewayTimeout,
193 http.StatusServiceUnavailable)
194 }),
195 },
196 }
197 }
198
199
200 type internalBigQueryStorageClient interface {
201 Close() error
202 setGoogleClientInfo(...string)
203 Connection() *grpc.ClientConn
204 CreateReadSession(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
205 ReadRows(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error)
206 BatchCreateReadSessionStreams(context.Context, *storagepb.BatchCreateReadSessionStreamsRequest, ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error)
207 FinalizeStream(context.Context, *storagepb.FinalizeStreamRequest, ...gax.CallOption) error
208 SplitReadStream(context.Context, *storagepb.SplitReadStreamRequest, ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error)
209 }
210
211
212
213
214
215
216
217
218
219
220
221
222 type BigQueryStorageClient struct {
223
224 internalClient internalBigQueryStorageClient
225
226
227 CallOptions *BigQueryStorageCallOptions
228 }
229
230
231
232
233
234 func (c *BigQueryStorageClient) Close() error {
235 return c.internalClient.Close()
236 }
237
238
239
240
241 func (c *BigQueryStorageClient) setGoogleClientInfo(keyval ...string) {
242 c.internalClient.setGoogleClientInfo(keyval...)
243 }
244
245
246
247
248
249 func (c *BigQueryStorageClient) Connection() *grpc.ClientConn {
250 return c.internalClient.Connection()
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264
265 func (c *BigQueryStorageClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
266 return c.internalClient.CreateReadSession(ctx, req, opts...)
267 }
268
269
270
271
272
273
274
275
276
277
278 func (c *BigQueryStorageClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error) {
279 return c.internalClient.ReadRows(ctx, req, opts...)
280 }
281
282
283
284
285 func (c *BigQueryStorageClient) BatchCreateReadSessionStreams(ctx context.Context, req *storagepb.BatchCreateReadSessionStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error) {
286 return c.internalClient.BatchCreateReadSessionStreams(ctx, req, opts...)
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 func (c *BigQueryStorageClient) FinalizeStream(ctx context.Context, req *storagepb.FinalizeStreamRequest, opts ...gax.CallOption) error {
304 return c.internalClient.FinalizeStream(ctx, req, opts...)
305 }
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320 func (c *BigQueryStorageClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
321 return c.internalClient.SplitReadStream(ctx, req, opts...)
322 }
323
324
325
326
327 type bigQueryStorageGRPCClient struct {
328
329 connPool gtransport.ConnPool
330
331
332 CallOptions **BigQueryStorageCallOptions
333
334
335 bigQueryStorageClient storagepb.BigQueryStorageClient
336
337
338 xGoogHeaders []string
339 }
340
341
342
343
344
345
346
347
348
349
350
351
352 func NewBigQueryStorageClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryStorageClient, error) {
353 clientOpts := defaultBigQueryStorageGRPCClientOptions()
354 if newBigQueryStorageClientHook != nil {
355 hookOpts, err := newBigQueryStorageClientHook(ctx, clientHookParams{})
356 if err != nil {
357 return nil, err
358 }
359 clientOpts = append(clientOpts, hookOpts...)
360 }
361
362 connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
363 if err != nil {
364 return nil, err
365 }
366 client := BigQueryStorageClient{CallOptions: defaultBigQueryStorageCallOptions()}
367
368 c := &bigQueryStorageGRPCClient{
369 connPool: connPool,
370 bigQueryStorageClient: storagepb.NewBigQueryStorageClient(connPool),
371 CallOptions: &client.CallOptions,
372 }
373 c.setGoogleClientInfo()
374
375 client.internalClient = c
376
377 return &client, nil
378 }
379
380
381
382
383
384 func (c *bigQueryStorageGRPCClient) Connection() *grpc.ClientConn {
385 return c.connPool.Conn()
386 }
387
388
389
390
391 func (c *bigQueryStorageGRPCClient) setGoogleClientInfo(keyval ...string) {
392 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
393 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
394 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
395 }
396
397
398
399 func (c *bigQueryStorageGRPCClient) Close() error {
400 return c.connPool.Close()
401 }
402
403
404 type bigQueryStorageRESTClient struct {
405
406 endpoint string
407
408
409 httpClient *http.Client
410
411
412 xGoogHeaders []string
413
414
415 CallOptions **BigQueryStorageCallOptions
416 }
417
418
419
420
421
422
423
424
425
426
427
428 func NewBigQueryStorageRESTClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryStorageClient, error) {
429 clientOpts := append(defaultBigQueryStorageRESTClientOptions(), opts...)
430 httpClient, endpoint, err := httptransport.NewClient(ctx, clientOpts...)
431 if err != nil {
432 return nil, err
433 }
434
435 callOpts := defaultBigQueryStorageRESTCallOptions()
436 c := &bigQueryStorageRESTClient{
437 endpoint: endpoint,
438 httpClient: httpClient,
439 CallOptions: &callOpts,
440 }
441 c.setGoogleClientInfo()
442
443 return &BigQueryStorageClient{internalClient: c, CallOptions: callOpts}, nil
444 }
445
446 func defaultBigQueryStorageRESTClientOptions() []option.ClientOption {
447 return []option.ClientOption{
448 internaloption.WithDefaultEndpoint("https://bigquerystorage.googleapis.com"),
449 internaloption.WithDefaultEndpointTemplate("https://bigquerystorage.UNIVERSE_DOMAIN"),
450 internaloption.WithDefaultMTLSEndpoint("https://bigquerystorage.mtls.googleapis.com"),
451 internaloption.WithDefaultUniverseDomain("googleapis.com"),
452 internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
453 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
454 }
455 }
456
457
458
459
460 func (c *bigQueryStorageRESTClient) setGoogleClientInfo(keyval ...string) {
461 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
462 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "rest", "UNKNOWN")
463 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
464 }
465
466
467
468 func (c *bigQueryStorageRESTClient) Close() error {
469
470 c.httpClient = nil
471 return nil
472 }
473
474
475
476
477 func (c *bigQueryStorageRESTClient) Connection() *grpc.ClientConn {
478 return nil
479 }
480 func (c *bigQueryStorageGRPCClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
481 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v&%s=%v", "table_reference.project_id", url.QueryEscape(req.GetTableReference().GetProjectId()), "table_reference.dataset_id", url.QueryEscape(req.GetTableReference().GetDatasetId()))}
482
483 hds = append(c.xGoogHeaders, hds...)
484 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
485 opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
486 var resp *storagepb.ReadSession
487 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
488 var err error
489 resp, err = c.bigQueryStorageClient.CreateReadSession(ctx, req, settings.GRPC...)
490 return err
491 }, opts...)
492 if err != nil {
493 return nil, err
494 }
495 return resp, nil
496 }
497
498 func (c *bigQueryStorageGRPCClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error) {
499 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_position.stream.name", url.QueryEscape(req.GetReadPosition().GetStream().GetName()))}
500
501 hds = append(c.xGoogHeaders, hds...)
502 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
503 opts = append((*c.CallOptions).ReadRows[0:len((*c.CallOptions).ReadRows):len((*c.CallOptions).ReadRows)], opts...)
504 var resp storagepb.BigQueryStorage_ReadRowsClient
505 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
506 var err error
507 resp, err = c.bigQueryStorageClient.ReadRows(ctx, req, settings.GRPC...)
508 return err
509 }, opts...)
510 if err != nil {
511 return nil, err
512 }
513 return resp, nil
514 }
515
516 func (c *bigQueryStorageGRPCClient) BatchCreateReadSessionStreams(ctx context.Context, req *storagepb.BatchCreateReadSessionStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error) {
517 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "session.name", url.QueryEscape(req.GetSession().GetName()))}
518
519 hds = append(c.xGoogHeaders, hds...)
520 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
521 opts = append((*c.CallOptions).BatchCreateReadSessionStreams[0:len((*c.CallOptions).BatchCreateReadSessionStreams):len((*c.CallOptions).BatchCreateReadSessionStreams)], opts...)
522 var resp *storagepb.BatchCreateReadSessionStreamsResponse
523 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
524 var err error
525 resp, err = c.bigQueryStorageClient.BatchCreateReadSessionStreams(ctx, req, settings.GRPC...)
526 return err
527 }, opts...)
528 if err != nil {
529 return nil, err
530 }
531 return resp, nil
532 }
533
534 func (c *bigQueryStorageGRPCClient) FinalizeStream(ctx context.Context, req *storagepb.FinalizeStreamRequest, opts ...gax.CallOption) error {
535 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "stream.name", url.QueryEscape(req.GetStream().GetName()))}
536
537 hds = append(c.xGoogHeaders, hds...)
538 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
539 opts = append((*c.CallOptions).FinalizeStream[0:len((*c.CallOptions).FinalizeStream):len((*c.CallOptions).FinalizeStream)], opts...)
540 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
541 var err error
542 _, err = c.bigQueryStorageClient.FinalizeStream(ctx, req, settings.GRPC...)
543 return err
544 }, opts...)
545 return err
546 }
547
548 func (c *bigQueryStorageGRPCClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
549 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "original_stream.name", url.QueryEscape(req.GetOriginalStream().GetName()))}
550
551 hds = append(c.xGoogHeaders, hds...)
552 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
553 opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
554 var resp *storagepb.SplitReadStreamResponse
555 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
556 var err error
557 resp, err = c.bigQueryStorageClient.SplitReadStream(ctx, req, settings.GRPC...)
558 return err
559 }, opts...)
560 if err != nil {
561 return nil, err
562 }
563 return resp, nil
564 }
565
566
567
568
569
570
571
572
573
574
575
576
577
578 func (c *bigQueryStorageRESTClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
579 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
580 jsonReq, err := m.Marshal(req)
581 if err != nil {
582 return nil, err
583 }
584
585 baseUrl, err := url.Parse(c.endpoint)
586 if err != nil {
587 return nil, err
588 }
589 baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetTableReference().GetProjectId())
590
591
592 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v&%s=%v", "table_reference.project_id", url.QueryEscape(req.GetTableReference().GetProjectId()), "table_reference.dataset_id", url.QueryEscape(req.GetTableReference().GetDatasetId()))}
593
594 hds = append(c.xGoogHeaders, hds...)
595 hds = append(hds, "Content-Type", "application/json")
596 headers := gax.BuildHeaders(ctx, hds...)
597 opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
598 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
599 resp := &storagepb.ReadSession{}
600 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
601 if settings.Path != "" {
602 baseUrl.Path = settings.Path
603 }
604 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
605 if err != nil {
606 return err
607 }
608 httpReq = httpReq.WithContext(ctx)
609 httpReq.Header = headers
610
611 httpRsp, err := c.httpClient.Do(httpReq)
612 if err != nil {
613 return err
614 }
615 defer httpRsp.Body.Close()
616
617 if err = googleapi.CheckResponse(httpRsp); err != nil {
618 return err
619 }
620
621 buf, err := io.ReadAll(httpRsp.Body)
622 if err != nil {
623 return err
624 }
625
626 if err := unm.Unmarshal(buf, resp); err != nil {
627 return err
628 }
629
630 return nil
631 }, opts...)
632 if e != nil {
633 return nil, e
634 }
635 return resp, nil
636 }
637
638
639
640
641
642
643
644
645
646
647 func (c *bigQueryStorageRESTClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error) {
648 baseUrl, err := url.Parse(c.endpoint)
649 if err != nil {
650 return nil, err
651 }
652 baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetReadPosition().GetStream().GetName())
653
654 params := url.Values{}
655 if req.GetReadPosition().GetOffset() != 0 {
656 params.Add("readPosition.offset", fmt.Sprintf("%v", req.GetReadPosition().GetOffset()))
657 }
658
659 baseUrl.RawQuery = params.Encode()
660
661
662 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_position.stream.name", url.QueryEscape(req.GetReadPosition().GetStream().GetName()))}
663
664 hds = append(c.xGoogHeaders, hds...)
665 hds = append(hds, "Content-Type", "application/json")
666 headers := gax.BuildHeaders(ctx, hds...)
667 var streamClient *readRowsRESTClient
668 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
669 if settings.Path != "" {
670 baseUrl.Path = settings.Path
671 }
672 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
673 if err != nil {
674 return err
675 }
676 httpReq = httpReq.WithContext(ctx)
677 httpReq.Header = headers
678
679 httpRsp, err := c.httpClient.Do(httpReq)
680 if err != nil {
681 return err
682 }
683
684 if err = googleapi.CheckResponse(httpRsp); err != nil {
685 return err
686 }
687
688 streamClient = &readRowsRESTClient{
689 ctx: ctx,
690 md: metadata.MD(httpRsp.Header),
691 stream: gax.NewProtoJSONStreamReader(httpRsp.Body, (&storagepb.ReadRowsResponse{}).ProtoReflect().Type()),
692 }
693 return nil
694 }, opts...)
695
696 return streamClient, e
697 }
698
699
700
701 type readRowsRESTClient struct {
702 ctx context.Context
703 md metadata.MD
704 stream *gax.ProtoJSONStream
705 }
706
707 func (c *readRowsRESTClient) Recv() (*storagepb.ReadRowsResponse, error) {
708 if err := c.ctx.Err(); err != nil {
709 defer c.stream.Close()
710 return nil, err
711 }
712 msg, err := c.stream.Recv()
713 if err != nil {
714 defer c.stream.Close()
715 return nil, err
716 }
717 res := msg.(*storagepb.ReadRowsResponse)
718 return res, nil
719 }
720
721 func (c *readRowsRESTClient) Header() (metadata.MD, error) {
722 return c.md, nil
723 }
724
725 func (c *readRowsRESTClient) Trailer() metadata.MD {
726 return c.md
727 }
728
729 func (c *readRowsRESTClient) CloseSend() error {
730
731 return fmt.Errorf("this method is not implemented for a server-stream")
732 }
733
734 func (c *readRowsRESTClient) Context() context.Context {
735 return c.ctx
736 }
737
738 func (c *readRowsRESTClient) SendMsg(m interface{}) error {
739
740 return fmt.Errorf("this method is not implemented for a server-stream")
741 }
742
743 func (c *readRowsRESTClient) RecvMsg(m interface{}) error {
744
745 return fmt.Errorf("this method is not implemented, use Recv")
746 }
747
748
749
750
751 func (c *bigQueryStorageRESTClient) BatchCreateReadSessionStreams(ctx context.Context, req *storagepb.BatchCreateReadSessionStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error) {
752 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
753 jsonReq, err := m.Marshal(req)
754 if err != nil {
755 return nil, err
756 }
757
758 baseUrl, err := url.Parse(c.endpoint)
759 if err != nil {
760 return nil, err
761 }
762 baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetSession().GetName())
763
764
765 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "session.name", url.QueryEscape(req.GetSession().GetName()))}
766
767 hds = append(c.xGoogHeaders, hds...)
768 hds = append(hds, "Content-Type", "application/json")
769 headers := gax.BuildHeaders(ctx, hds...)
770 opts = append((*c.CallOptions).BatchCreateReadSessionStreams[0:len((*c.CallOptions).BatchCreateReadSessionStreams):len((*c.CallOptions).BatchCreateReadSessionStreams)], opts...)
771 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
772 resp := &storagepb.BatchCreateReadSessionStreamsResponse{}
773 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
774 if settings.Path != "" {
775 baseUrl.Path = settings.Path
776 }
777 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
778 if err != nil {
779 return err
780 }
781 httpReq = httpReq.WithContext(ctx)
782 httpReq.Header = headers
783
784 httpRsp, err := c.httpClient.Do(httpReq)
785 if err != nil {
786 return err
787 }
788 defer httpRsp.Body.Close()
789
790 if err = googleapi.CheckResponse(httpRsp); err != nil {
791 return err
792 }
793
794 buf, err := io.ReadAll(httpRsp.Body)
795 if err != nil {
796 return err
797 }
798
799 if err := unm.Unmarshal(buf, resp); err != nil {
800 return err
801 }
802
803 return nil
804 }, opts...)
805 if e != nil {
806 return nil, e
807 }
808 return resp, nil
809 }
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825 func (c *bigQueryStorageRESTClient) FinalizeStream(ctx context.Context, req *storagepb.FinalizeStreamRequest, opts ...gax.CallOption) error {
826 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
827 jsonReq, err := m.Marshal(req)
828 if err != nil {
829 return err
830 }
831
832 baseUrl, err := url.Parse(c.endpoint)
833 if err != nil {
834 return err
835 }
836 baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetStream().GetName())
837
838
839 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "stream.name", url.QueryEscape(req.GetStream().GetName()))}
840
841 hds = append(c.xGoogHeaders, hds...)
842 hds = append(hds, "Content-Type", "application/json")
843 headers := gax.BuildHeaders(ctx, hds...)
844 return gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
845 if settings.Path != "" {
846 baseUrl.Path = settings.Path
847 }
848 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
849 if err != nil {
850 return err
851 }
852 httpReq = httpReq.WithContext(ctx)
853 httpReq.Header = headers
854
855 httpRsp, err := c.httpClient.Do(httpReq)
856 if err != nil {
857 return err
858 }
859 defer httpRsp.Body.Close()
860
861
862
863 return googleapi.CheckResponse(httpRsp)
864 }, opts...)
865 }
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880 func (c *bigQueryStorageRESTClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
881 baseUrl, err := url.Parse(c.endpoint)
882 if err != nil {
883 return nil, err
884 }
885 baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetOriginalStream().GetName())
886
887 params := url.Values{}
888 if req.GetFraction() != 0 {
889 params.Add("fraction", fmt.Sprintf("%v", req.GetFraction()))
890 }
891
892 baseUrl.RawQuery = params.Encode()
893
894
895 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "original_stream.name", url.QueryEscape(req.GetOriginalStream().GetName()))}
896
897 hds = append(c.xGoogHeaders, hds...)
898 hds = append(hds, "Content-Type", "application/json")
899 headers := gax.BuildHeaders(ctx, hds...)
900 opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
901 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
902 resp := &storagepb.SplitReadStreamResponse{}
903 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
904 if settings.Path != "" {
905 baseUrl.Path = settings.Path
906 }
907 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
908 if err != nil {
909 return err
910 }
911 httpReq = httpReq.WithContext(ctx)
912 httpReq.Header = headers
913
914 httpRsp, err := c.httpClient.Do(httpReq)
915 if err != nil {
916 return err
917 }
918 defer httpRsp.Body.Close()
919
920 if err = googleapi.CheckResponse(httpRsp); err != nil {
921 return err
922 }
923
924 buf, err := io.ReadAll(httpRsp.Body)
925 if err != nil {
926 return err
927 }
928
929 if err := unm.Unmarshal(buf, resp); err != nil {
930 return err
931 }
932
933 return nil
934 }, opts...)
935 if e != nil {
936 return nil, e
937 }
938 return resp, nil
939 }
940
View as plain text