1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package storage
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "math"
25 "net/http"
26 "net/url"
27 "time"
28
29 storagepb "cloud.google.com/go/bigquery/storage/apiv1beta2/storagepb"
30 gax "github.com/googleapis/gax-go/v2"
31 "google.golang.org/api/googleapi"
32 "google.golang.org/api/option"
33 "google.golang.org/api/option/internaloption"
34 gtransport "google.golang.org/api/transport/grpc"
35 httptransport "google.golang.org/api/transport/http"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/protobuf/encoding/protojson"
39 )
40
41 var newBigQueryWriteClientHook clientHook
42
43
44 type BigQueryWriteCallOptions struct {
45 CreateWriteStream []gax.CallOption
46 AppendRows []gax.CallOption
47 GetWriteStream []gax.CallOption
48 FinalizeWriteStream []gax.CallOption
49 BatchCommitWriteStreams []gax.CallOption
50 FlushRows []gax.CallOption
51 }
52
53 func defaultBigQueryWriteGRPCClientOptions() []option.ClientOption {
54 return []option.ClientOption{
55 internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
56 internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
57 internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
58 internaloption.WithDefaultUniverseDomain("googleapis.com"),
59 internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
60 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
61 internaloption.EnableJwtWithScope(),
62 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
63 grpc.MaxCallRecvMsgSize(math.MaxInt32))),
64 }
65 }
66
67 func defaultBigQueryWriteCallOptions() *BigQueryWriteCallOptions {
68 return &BigQueryWriteCallOptions{
69 CreateWriteStream: []gax.CallOption{
70 gax.WithTimeout(600000 * time.Millisecond),
71 gax.WithRetry(func() gax.Retryer {
72 return gax.OnCodes([]codes.Code{
73 codes.DeadlineExceeded,
74 codes.Unavailable,
75 codes.ResourceExhausted,
76 }, gax.Backoff{
77 Initial: 100 * time.Millisecond,
78 Max: 60000 * time.Millisecond,
79 Multiplier: 1.30,
80 })
81 }),
82 },
83 AppendRows: []gax.CallOption{
84 gax.WithRetry(func() gax.Retryer {
85 return gax.OnCodes([]codes.Code{
86 codes.Unavailable,
87 codes.ResourceExhausted,
88 }, gax.Backoff{
89 Initial: 100 * time.Millisecond,
90 Max: 60000 * time.Millisecond,
91 Multiplier: 1.30,
92 })
93 }),
94 },
95 GetWriteStream: []gax.CallOption{
96 gax.WithTimeout(600000 * time.Millisecond),
97 gax.WithRetry(func() gax.Retryer {
98 return gax.OnCodes([]codes.Code{
99 codes.DeadlineExceeded,
100 codes.Unavailable,
101 }, gax.Backoff{
102 Initial: 100 * time.Millisecond,
103 Max: 60000 * time.Millisecond,
104 Multiplier: 1.30,
105 })
106 }),
107 },
108 FinalizeWriteStream: []gax.CallOption{
109 gax.WithTimeout(600000 * time.Millisecond),
110 gax.WithRetry(func() gax.Retryer {
111 return gax.OnCodes([]codes.Code{
112 codes.DeadlineExceeded,
113 codes.Unavailable,
114 }, gax.Backoff{
115 Initial: 100 * time.Millisecond,
116 Max: 60000 * time.Millisecond,
117 Multiplier: 1.30,
118 })
119 }),
120 },
121 BatchCommitWriteStreams: []gax.CallOption{
122 gax.WithTimeout(600000 * time.Millisecond),
123 gax.WithRetry(func() gax.Retryer {
124 return gax.OnCodes([]codes.Code{
125 codes.DeadlineExceeded,
126 codes.Unavailable,
127 }, gax.Backoff{
128 Initial: 100 * time.Millisecond,
129 Max: 60000 * time.Millisecond,
130 Multiplier: 1.30,
131 })
132 }),
133 },
134 FlushRows: []gax.CallOption{
135 gax.WithTimeout(600000 * time.Millisecond),
136 gax.WithRetry(func() gax.Retryer {
137 return gax.OnCodes([]codes.Code{
138 codes.DeadlineExceeded,
139 codes.Unavailable,
140 }, gax.Backoff{
141 Initial: 100 * time.Millisecond,
142 Max: 60000 * time.Millisecond,
143 Multiplier: 1.30,
144 })
145 }),
146 },
147 }
148 }
149
150 func defaultBigQueryWriteRESTCallOptions() *BigQueryWriteCallOptions {
151 return &BigQueryWriteCallOptions{
152 CreateWriteStream: []gax.CallOption{
153 gax.WithTimeout(600000 * time.Millisecond),
154 gax.WithRetry(func() gax.Retryer {
155 return gax.OnHTTPCodes(gax.Backoff{
156 Initial: 100 * time.Millisecond,
157 Max: 60000 * time.Millisecond,
158 Multiplier: 1.30,
159 },
160 http.StatusGatewayTimeout,
161 http.StatusServiceUnavailable,
162 http.StatusTooManyRequests)
163 }),
164 },
165 AppendRows: []gax.CallOption{
166 gax.WithTimeout(86400000 * time.Millisecond),
167 gax.WithRetry(func() gax.Retryer {
168 return gax.OnHTTPCodes(gax.Backoff{
169 Initial: 100 * time.Millisecond,
170 Max: 60000 * time.Millisecond,
171 Multiplier: 1.30,
172 },
173 http.StatusServiceUnavailable,
174 http.StatusTooManyRequests)
175 }),
176 },
177 GetWriteStream: []gax.CallOption{
178 gax.WithTimeout(600000 * time.Millisecond),
179 gax.WithRetry(func() gax.Retryer {
180 return gax.OnHTTPCodes(gax.Backoff{
181 Initial: 100 * time.Millisecond,
182 Max: 60000 * time.Millisecond,
183 Multiplier: 1.30,
184 },
185 http.StatusGatewayTimeout,
186 http.StatusServiceUnavailable)
187 }),
188 },
189 FinalizeWriteStream: []gax.CallOption{
190 gax.WithTimeout(600000 * time.Millisecond),
191 gax.WithRetry(func() gax.Retryer {
192 return gax.OnHTTPCodes(gax.Backoff{
193 Initial: 100 * time.Millisecond,
194 Max: 60000 * time.Millisecond,
195 Multiplier: 1.30,
196 },
197 http.StatusGatewayTimeout,
198 http.StatusServiceUnavailable)
199 }),
200 },
201 BatchCommitWriteStreams: []gax.CallOption{
202 gax.WithTimeout(600000 * time.Millisecond),
203 gax.WithRetry(func() gax.Retryer {
204 return gax.OnHTTPCodes(gax.Backoff{
205 Initial: 100 * time.Millisecond,
206 Max: 60000 * time.Millisecond,
207 Multiplier: 1.30,
208 },
209 http.StatusGatewayTimeout,
210 http.StatusServiceUnavailable)
211 }),
212 },
213 FlushRows: []gax.CallOption{
214 gax.WithTimeout(600000 * time.Millisecond),
215 gax.WithRetry(func() gax.Retryer {
216 return gax.OnHTTPCodes(gax.Backoff{
217 Initial: 100 * time.Millisecond,
218 Max: 60000 * time.Millisecond,
219 Multiplier: 1.30,
220 },
221 http.StatusGatewayTimeout,
222 http.StatusServiceUnavailable)
223 }),
224 },
225 }
226 }
227
228
229 type internalBigQueryWriteClient interface {
230 Close() error
231 setGoogleClientInfo(...string)
232 Connection() *grpc.ClientConn
233 CreateWriteStream(context.Context, *storagepb.CreateWriteStreamRequest, ...gax.CallOption) (*storagepb.WriteStream, error)
234 AppendRows(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
235 GetWriteStream(context.Context, *storagepb.GetWriteStreamRequest, ...gax.CallOption) (*storagepb.WriteStream, error)
236 FinalizeWriteStream(context.Context, *storagepb.FinalizeWriteStreamRequest, ...gax.CallOption) (*storagepb.FinalizeWriteStreamResponse, error)
237 BatchCommitWriteStreams(context.Context, *storagepb.BatchCommitWriteStreamsRequest, ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error)
238 FlushRows(context.Context, *storagepb.FlushRowsRequest, ...gax.CallOption) (*storagepb.FlushRowsResponse, error)
239 }
240
241
242
243
244
245
246
247
248
249
250
251
252
253 type BigQueryWriteClient struct {
254
255 internalClient internalBigQueryWriteClient
256
257
258 CallOptions *BigQueryWriteCallOptions
259 }
260
261
262
263
264
265 func (c *BigQueryWriteClient) Close() error {
266 return c.internalClient.Close()
267 }
268
269
270
271
272 func (c *BigQueryWriteClient) setGoogleClientInfo(keyval ...string) {
273 c.internalClient.setGoogleClientInfo(keyval...)
274 }
275
276
277
278
279
280 func (c *BigQueryWriteClient) Connection() *grpc.ClientConn {
281 return c.internalClient.Connection()
282 }
283
284
285
286
287
288
289
290
291
292 func (c *BigQueryWriteClient) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
293 return c.internalClient.CreateWriteStream(ctx, req, opts...)
294 }
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 func (c *BigQueryWriteClient) AppendRows(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
320 return c.internalClient.AppendRows(ctx, opts...)
321 }
322
323
324
325
326 func (c *BigQueryWriteClient) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
327 return c.internalClient.GetWriteStream(ctx, req, opts...)
328 }
329
330
331
332
333
334 func (c *BigQueryWriteClient) FinalizeWriteStream(ctx context.Context, req *storagepb.FinalizeWriteStreamRequest, opts ...gax.CallOption) (*storagepb.FinalizeWriteStreamResponse, error) {
335 return c.internalClient.FinalizeWriteStream(ctx, req, opts...)
336 }
337
338
339
340
341
342
343
344
345 func (c *BigQueryWriteClient) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
346 return c.internalClient.BatchCommitWriteStreams(ctx, req, opts...)
347 }
348
349
350
351
352
353
354
355
356
357 func (c *BigQueryWriteClient) FlushRows(ctx context.Context, req *storagepb.FlushRowsRequest, opts ...gax.CallOption) (*storagepb.FlushRowsResponse, error) {
358 return c.internalClient.FlushRows(ctx, req, opts...)
359 }
360
361
362
363
364 type bigQueryWriteGRPCClient struct {
365
366 connPool gtransport.ConnPool
367
368
369 CallOptions **BigQueryWriteCallOptions
370
371
372 bigQueryWriteClient storagepb.BigQueryWriteClient
373
374
375 xGoogHeaders []string
376 }
377
378
379
380
381
382
383
384
385
386
387
388
389
390 func NewBigQueryWriteClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryWriteClient, error) {
391 clientOpts := defaultBigQueryWriteGRPCClientOptions()
392 if newBigQueryWriteClientHook != nil {
393 hookOpts, err := newBigQueryWriteClientHook(ctx, clientHookParams{})
394 if err != nil {
395 return nil, err
396 }
397 clientOpts = append(clientOpts, hookOpts...)
398 }
399
400 connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
401 if err != nil {
402 return nil, err
403 }
404 client := BigQueryWriteClient{CallOptions: defaultBigQueryWriteCallOptions()}
405
406 c := &bigQueryWriteGRPCClient{
407 connPool: connPool,
408 bigQueryWriteClient: storagepb.NewBigQueryWriteClient(connPool),
409 CallOptions: &client.CallOptions,
410 }
411 c.setGoogleClientInfo()
412
413 client.internalClient = c
414
415 return &client, nil
416 }
417
418
419
420
421
422 func (c *bigQueryWriteGRPCClient) Connection() *grpc.ClientConn {
423 return c.connPool.Conn()
424 }
425
426
427
428
429 func (c *bigQueryWriteGRPCClient) setGoogleClientInfo(keyval ...string) {
430 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
431 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
432 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
433 }
434
435
436
437 func (c *bigQueryWriteGRPCClient) Close() error {
438 return c.connPool.Close()
439 }
440
441
442 type bigQueryWriteRESTClient struct {
443
444 endpoint string
445
446
447 httpClient *http.Client
448
449
450 xGoogHeaders []string
451
452
453 CallOptions **BigQueryWriteCallOptions
454 }
455
456
457
458
459
460
461
462
463
464
465
466
467 func NewBigQueryWriteRESTClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryWriteClient, error) {
468 clientOpts := append(defaultBigQueryWriteRESTClientOptions(), opts...)
469 httpClient, endpoint, err := httptransport.NewClient(ctx, clientOpts...)
470 if err != nil {
471 return nil, err
472 }
473
474 callOpts := defaultBigQueryWriteRESTCallOptions()
475 c := &bigQueryWriteRESTClient{
476 endpoint: endpoint,
477 httpClient: httpClient,
478 CallOptions: &callOpts,
479 }
480 c.setGoogleClientInfo()
481
482 return &BigQueryWriteClient{internalClient: c, CallOptions: callOpts}, nil
483 }
484
485 func defaultBigQueryWriteRESTClientOptions() []option.ClientOption {
486 return []option.ClientOption{
487 internaloption.WithDefaultEndpoint("https://bigquerystorage.googleapis.com"),
488 internaloption.WithDefaultEndpointTemplate("https://bigquerystorage.UNIVERSE_DOMAIN"),
489 internaloption.WithDefaultMTLSEndpoint("https://bigquerystorage.mtls.googleapis.com"),
490 internaloption.WithDefaultUniverseDomain("googleapis.com"),
491 internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
492 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
493 }
494 }
495
496
497
498
499 func (c *bigQueryWriteRESTClient) setGoogleClientInfo(keyval ...string) {
500 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
501 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "rest", "UNKNOWN")
502 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
503 }
504
505
506
507 func (c *bigQueryWriteRESTClient) Close() error {
508
509 c.httpClient = nil
510 return nil
511 }
512
513
514
515
516 func (c *bigQueryWriteRESTClient) Connection() *grpc.ClientConn {
517 return nil
518 }
519 func (c *bigQueryWriteGRPCClient) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
520 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))}
521
522 hds = append(c.xGoogHeaders, hds...)
523 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
524 opts = append((*c.CallOptions).CreateWriteStream[0:len((*c.CallOptions).CreateWriteStream):len((*c.CallOptions).CreateWriteStream)], opts...)
525 var resp *storagepb.WriteStream
526 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
527 var err error
528 resp, err = c.bigQueryWriteClient.CreateWriteStream(ctx, req, settings.GRPC...)
529 return err
530 }, opts...)
531 if err != nil {
532 return nil, err
533 }
534 return resp, nil
535 }
536
537 func (c *bigQueryWriteGRPCClient) AppendRows(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
538 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, c.xGoogHeaders...)
539 var resp storagepb.BigQueryWrite_AppendRowsClient
540 opts = append((*c.CallOptions).AppendRows[0:len((*c.CallOptions).AppendRows):len((*c.CallOptions).AppendRows)], opts...)
541 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
542 var err error
543 resp, err = c.bigQueryWriteClient.AppendRows(ctx, settings.GRPC...)
544 return err
545 }, opts...)
546 if err != nil {
547 return nil, err
548 }
549 return resp, nil
550 }
551
552 func (c *bigQueryWriteGRPCClient) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
553 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
554
555 hds = append(c.xGoogHeaders, hds...)
556 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
557 opts = append((*c.CallOptions).GetWriteStream[0:len((*c.CallOptions).GetWriteStream):len((*c.CallOptions).GetWriteStream)], opts...)
558 var resp *storagepb.WriteStream
559 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
560 var err error
561 resp, err = c.bigQueryWriteClient.GetWriteStream(ctx, req, settings.GRPC...)
562 return err
563 }, opts...)
564 if err != nil {
565 return nil, err
566 }
567 return resp, nil
568 }
569
570 func (c *bigQueryWriteGRPCClient) FinalizeWriteStream(ctx context.Context, req *storagepb.FinalizeWriteStreamRequest, opts ...gax.CallOption) (*storagepb.FinalizeWriteStreamResponse, error) {
571 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
572
573 hds = append(c.xGoogHeaders, hds...)
574 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
575 opts = append((*c.CallOptions).FinalizeWriteStream[0:len((*c.CallOptions).FinalizeWriteStream):len((*c.CallOptions).FinalizeWriteStream)], opts...)
576 var resp *storagepb.FinalizeWriteStreamResponse
577 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
578 var err error
579 resp, err = c.bigQueryWriteClient.FinalizeWriteStream(ctx, req, settings.GRPC...)
580 return err
581 }, opts...)
582 if err != nil {
583 return nil, err
584 }
585 return resp, nil
586 }
587
588 func (c *bigQueryWriteGRPCClient) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
589 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))}
590
591 hds = append(c.xGoogHeaders, hds...)
592 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
593 opts = append((*c.CallOptions).BatchCommitWriteStreams[0:len((*c.CallOptions).BatchCommitWriteStreams):len((*c.CallOptions).BatchCommitWriteStreams)], opts...)
594 var resp *storagepb.BatchCommitWriteStreamsResponse
595 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
596 var err error
597 resp, err = c.bigQueryWriteClient.BatchCommitWriteStreams(ctx, req, settings.GRPC...)
598 return err
599 }, opts...)
600 if err != nil {
601 return nil, err
602 }
603 return resp, nil
604 }
605
606 func (c *bigQueryWriteGRPCClient) FlushRows(ctx context.Context, req *storagepb.FlushRowsRequest, opts ...gax.CallOption) (*storagepb.FlushRowsResponse, error) {
607 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "write_stream", url.QueryEscape(req.GetWriteStream()))}
608
609 hds = append(c.xGoogHeaders, hds...)
610 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
611 opts = append((*c.CallOptions).FlushRows[0:len((*c.CallOptions).FlushRows):len((*c.CallOptions).FlushRows)], opts...)
612 var resp *storagepb.FlushRowsResponse
613 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
614 var err error
615 resp, err = c.bigQueryWriteClient.FlushRows(ctx, req, settings.GRPC...)
616 return err
617 }, opts...)
618 if err != nil {
619 return nil, err
620 }
621 return resp, nil
622 }
623
624
625
626
627
628
629
630
631
632 func (c *bigQueryWriteRESTClient) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
633 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
634 body := req.GetWriteStream()
635 jsonReq, err := m.Marshal(body)
636 if err != nil {
637 return nil, err
638 }
639
640 baseUrl, err := url.Parse(c.endpoint)
641 if err != nil {
642 return nil, err
643 }
644 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetParent())
645
646
647 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))}
648
649 hds = append(c.xGoogHeaders, hds...)
650 hds = append(hds, "Content-Type", "application/json")
651 headers := gax.BuildHeaders(ctx, hds...)
652 opts = append((*c.CallOptions).CreateWriteStream[0:len((*c.CallOptions).CreateWriteStream):len((*c.CallOptions).CreateWriteStream)], opts...)
653 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
654 resp := &storagepb.WriteStream{}
655 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
656 if settings.Path != "" {
657 baseUrl.Path = settings.Path
658 }
659 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
660 if err != nil {
661 return err
662 }
663 httpReq = httpReq.WithContext(ctx)
664 httpReq.Header = headers
665
666 httpRsp, err := c.httpClient.Do(httpReq)
667 if err != nil {
668 return err
669 }
670 defer httpRsp.Body.Close()
671
672 if err = googleapi.CheckResponse(httpRsp); err != nil {
673 return err
674 }
675
676 buf, err := io.ReadAll(httpRsp.Body)
677 if err != nil {
678 return err
679 }
680
681 if err := unm.Unmarshal(buf, resp); err != nil {
682 return err
683 }
684
685 return nil
686 }, opts...)
687 if e != nil {
688 return nil, e
689 }
690 return resp, nil
691 }
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716 func (c *bigQueryWriteRESTClient) AppendRows(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
717 return nil, fmt.Errorf("AppendRows not yet supported for REST clients")
718 }
719
720
721
722
723 func (c *bigQueryWriteRESTClient) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
724 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
725 jsonReq, err := m.Marshal(req)
726 if err != nil {
727 return nil, err
728 }
729
730 baseUrl, err := url.Parse(c.endpoint)
731 if err != nil {
732 return nil, err
733 }
734 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetName())
735
736
737 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
738
739 hds = append(c.xGoogHeaders, hds...)
740 hds = append(hds, "Content-Type", "application/json")
741 headers := gax.BuildHeaders(ctx, hds...)
742 opts = append((*c.CallOptions).GetWriteStream[0:len((*c.CallOptions).GetWriteStream):len((*c.CallOptions).GetWriteStream)], opts...)
743 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
744 resp := &storagepb.WriteStream{}
745 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
746 if settings.Path != "" {
747 baseUrl.Path = settings.Path
748 }
749 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
750 if err != nil {
751 return err
752 }
753 httpReq = httpReq.WithContext(ctx)
754 httpReq.Header = headers
755
756 httpRsp, err := c.httpClient.Do(httpReq)
757 if err != nil {
758 return err
759 }
760 defer httpRsp.Body.Close()
761
762 if err = googleapi.CheckResponse(httpRsp); err != nil {
763 return err
764 }
765
766 buf, err := io.ReadAll(httpRsp.Body)
767 if err != nil {
768 return err
769 }
770
771 if err := unm.Unmarshal(buf, resp); err != nil {
772 return err
773 }
774
775 return nil
776 }, opts...)
777 if e != nil {
778 return nil, e
779 }
780 return resp, nil
781 }
782
783
784
785
786
787 func (c *bigQueryWriteRESTClient) FinalizeWriteStream(ctx context.Context, req *storagepb.FinalizeWriteStreamRequest, opts ...gax.CallOption) (*storagepb.FinalizeWriteStreamResponse, error) {
788 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
789 jsonReq, err := m.Marshal(req)
790 if err != nil {
791 return nil, err
792 }
793
794 baseUrl, err := url.Parse(c.endpoint)
795 if err != nil {
796 return nil, err
797 }
798 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetName())
799
800
801 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
802
803 hds = append(c.xGoogHeaders, hds...)
804 hds = append(hds, "Content-Type", "application/json")
805 headers := gax.BuildHeaders(ctx, hds...)
806 opts = append((*c.CallOptions).FinalizeWriteStream[0:len((*c.CallOptions).FinalizeWriteStream):len((*c.CallOptions).FinalizeWriteStream)], opts...)
807 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
808 resp := &storagepb.FinalizeWriteStreamResponse{}
809 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
810 if settings.Path != "" {
811 baseUrl.Path = settings.Path
812 }
813 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
814 if err != nil {
815 return err
816 }
817 httpReq = httpReq.WithContext(ctx)
818 httpReq.Header = headers
819
820 httpRsp, err := c.httpClient.Do(httpReq)
821 if err != nil {
822 return err
823 }
824 defer httpRsp.Body.Close()
825
826 if err = googleapi.CheckResponse(httpRsp); err != nil {
827 return err
828 }
829
830 buf, err := io.ReadAll(httpRsp.Body)
831 if err != nil {
832 return err
833 }
834
835 if err := unm.Unmarshal(buf, resp); err != nil {
836 return err
837 }
838
839 return nil
840 }, opts...)
841 if e != nil {
842 return nil, e
843 }
844 return resp, nil
845 }
846
847
848
849
850
851
852
853
854 func (c *bigQueryWriteRESTClient) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
855 baseUrl, err := url.Parse(c.endpoint)
856 if err != nil {
857 return nil, err
858 }
859 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetParent())
860
861 params := url.Values{}
862 if items := req.GetWriteStreams(); len(items) > 0 {
863 for _, item := range items {
864 params.Add("writeStreams", fmt.Sprintf("%v", item))
865 }
866 }
867
868 baseUrl.RawQuery = params.Encode()
869
870
871 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))}
872
873 hds = append(c.xGoogHeaders, hds...)
874 hds = append(hds, "Content-Type", "application/json")
875 headers := gax.BuildHeaders(ctx, hds...)
876 opts = append((*c.CallOptions).BatchCommitWriteStreams[0:len((*c.CallOptions).BatchCommitWriteStreams):len((*c.CallOptions).BatchCommitWriteStreams)], opts...)
877 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
878 resp := &storagepb.BatchCommitWriteStreamsResponse{}
879 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
880 if settings.Path != "" {
881 baseUrl.Path = settings.Path
882 }
883 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
884 if err != nil {
885 return err
886 }
887 httpReq = httpReq.WithContext(ctx)
888 httpReq.Header = headers
889
890 httpRsp, err := c.httpClient.Do(httpReq)
891 if err != nil {
892 return err
893 }
894 defer httpRsp.Body.Close()
895
896 if err = googleapi.CheckResponse(httpRsp); err != nil {
897 return err
898 }
899
900 buf, err := io.ReadAll(httpRsp.Body)
901 if err != nil {
902 return err
903 }
904
905 if err := unm.Unmarshal(buf, resp); err != nil {
906 return err
907 }
908
909 return nil
910 }, opts...)
911 if e != nil {
912 return nil, e
913 }
914 return resp, nil
915 }
916
917
918
919
920
921
922
923
924
925 func (c *bigQueryWriteRESTClient) FlushRows(ctx context.Context, req *storagepb.FlushRowsRequest, opts ...gax.CallOption) (*storagepb.FlushRowsResponse, error) {
926 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
927 jsonReq, err := m.Marshal(req)
928 if err != nil {
929 return nil, err
930 }
931
932 baseUrl, err := url.Parse(c.endpoint)
933 if err != nil {
934 return nil, err
935 }
936 baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetWriteStream())
937
938
939 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "write_stream", url.QueryEscape(req.GetWriteStream()))}
940
941 hds = append(c.xGoogHeaders, hds...)
942 hds = append(hds, "Content-Type", "application/json")
943 headers := gax.BuildHeaders(ctx, hds...)
944 opts = append((*c.CallOptions).FlushRows[0:len((*c.CallOptions).FlushRows):len((*c.CallOptions).FlushRows)], opts...)
945 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
946 resp := &storagepb.FlushRowsResponse{}
947 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
948 if settings.Path != "" {
949 baseUrl.Path = settings.Path
950 }
951 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
952 if err != nil {
953 return err
954 }
955 httpReq = httpReq.WithContext(ctx)
956 httpReq.Header = headers
957
958 httpRsp, err := c.httpClient.Do(httpReq)
959 if err != nil {
960 return err
961 }
962 defer httpRsp.Body.Close()
963
964 if err = googleapi.CheckResponse(httpRsp); err != nil {
965 return err
966 }
967
968 buf, err := io.ReadAll(httpRsp.Body)
969 if err != nil {
970 return err
971 }
972
973 if err := unm.Unmarshal(buf, resp); err != nil {
974 return err
975 }
976
977 return nil
978 }, opts...)
979 if e != nil {
980 return nil, e
981 }
982 return resp, nil
983 }
984
View as plain text