...

Source file src/cloud.google.com/go/bigquery/storage/apiv1beta1/big_query_storage_client.go

Documentation: cloud.google.com/go/bigquery/storage/apiv1beta1

     1  // Copyright 2024 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Code generated by protoc-gen-go_gapic. DO NOT EDIT.
    16  
    17  package storage
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"math"
    25  	"net/http"
    26  	"net/url"
    27  	"time"
    28  
    29  	storagepb "cloud.google.com/go/bigquery/storage/apiv1beta1/storagepb"
    30  	gax "github.com/googleapis/gax-go/v2"
    31  	"google.golang.org/api/googleapi"
    32  	"google.golang.org/api/option"
    33  	"google.golang.org/api/option/internaloption"
    34  	gtransport "google.golang.org/api/transport/grpc"
    35  	httptransport "google.golang.org/api/transport/http"
    36  	"google.golang.org/grpc"
    37  	"google.golang.org/grpc/codes"
    38  	"google.golang.org/grpc/metadata"
    39  	"google.golang.org/protobuf/encoding/protojson"
    40  )
    41  
    42  var newBigQueryStorageClientHook clientHook
    43  
    44  // BigQueryStorageCallOptions contains the retry settings for each method of BigQueryStorageClient.
    45  type BigQueryStorageCallOptions struct {
    46  	CreateReadSession             []gax.CallOption
    47  	ReadRows                      []gax.CallOption
    48  	BatchCreateReadSessionStreams []gax.CallOption
    49  	FinalizeStream                []gax.CallOption
    50  	SplitReadStream               []gax.CallOption
    51  }
    52  
    53  func defaultBigQueryStorageGRPCClientOptions() []option.ClientOption {
    54  	return []option.ClientOption{
    55  		internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
    56  		internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
    57  		internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
    58  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
    59  		internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
    60  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
    61  		internaloption.EnableJwtWithScope(),
    62  		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
    63  			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
    64  	}
    65  }
    66  
    67  func defaultBigQueryStorageCallOptions() *BigQueryStorageCallOptions {
    68  	return &BigQueryStorageCallOptions{
    69  		CreateReadSession: []gax.CallOption{
    70  			gax.WithTimeout(600000 * time.Millisecond),
    71  			gax.WithRetry(func() gax.Retryer {
    72  				return gax.OnCodes([]codes.Code{
    73  					codes.DeadlineExceeded,
    74  					codes.Unavailable,
    75  				}, gax.Backoff{
    76  					Initial:    100 * time.Millisecond,
    77  					Max:        60000 * time.Millisecond,
    78  					Multiplier: 1.30,
    79  				})
    80  			}),
    81  		},
    82  		ReadRows: []gax.CallOption{
    83  			gax.WithRetry(func() gax.Retryer {
    84  				return gax.OnCodes([]codes.Code{
    85  					codes.Unavailable,
    86  				}, gax.Backoff{
    87  					Initial:    100 * time.Millisecond,
    88  					Max:        60000 * time.Millisecond,
    89  					Multiplier: 1.30,
    90  				})
    91  			}),
    92  		},
    93  		BatchCreateReadSessionStreams: []gax.CallOption{
    94  			gax.WithTimeout(600000 * time.Millisecond),
    95  			gax.WithRetry(func() gax.Retryer {
    96  				return gax.OnCodes([]codes.Code{
    97  					codes.DeadlineExceeded,
    98  					codes.Unavailable,
    99  				}, gax.Backoff{
   100  					Initial:    100 * time.Millisecond,
   101  					Max:        60000 * time.Millisecond,
   102  					Multiplier: 1.30,
   103  				})
   104  			}),
   105  		},
   106  		FinalizeStream: []gax.CallOption{
   107  			gax.WithTimeout(600000 * time.Millisecond),
   108  			gax.WithRetry(func() gax.Retryer {
   109  				return gax.OnCodes([]codes.Code{
   110  					codes.DeadlineExceeded,
   111  					codes.Unavailable,
   112  				}, gax.Backoff{
   113  					Initial:    100 * time.Millisecond,
   114  					Max:        60000 * time.Millisecond,
   115  					Multiplier: 1.30,
   116  				})
   117  			}),
   118  		},
   119  		SplitReadStream: []gax.CallOption{
   120  			gax.WithTimeout(600000 * time.Millisecond),
   121  			gax.WithRetry(func() gax.Retryer {
   122  				return gax.OnCodes([]codes.Code{
   123  					codes.DeadlineExceeded,
   124  					codes.Unavailable,
   125  				}, gax.Backoff{
   126  					Initial:    100 * time.Millisecond,
   127  					Max:        60000 * time.Millisecond,
   128  					Multiplier: 1.30,
   129  				})
   130  			}),
   131  		},
   132  	}
   133  }
   134  
   135  func defaultBigQueryStorageRESTCallOptions() *BigQueryStorageCallOptions {
   136  	return &BigQueryStorageCallOptions{
   137  		CreateReadSession: []gax.CallOption{
   138  			gax.WithTimeout(600000 * time.Millisecond),
   139  			gax.WithRetry(func() gax.Retryer {
   140  				return gax.OnHTTPCodes(gax.Backoff{
   141  					Initial:    100 * time.Millisecond,
   142  					Max:        60000 * time.Millisecond,
   143  					Multiplier: 1.30,
   144  				},
   145  					http.StatusGatewayTimeout,
   146  					http.StatusServiceUnavailable)
   147  			}),
   148  		},
   149  		ReadRows: []gax.CallOption{
   150  			gax.WithTimeout(86400000 * time.Millisecond),
   151  			gax.WithRetry(func() gax.Retryer {
   152  				return gax.OnHTTPCodes(gax.Backoff{
   153  					Initial:    100 * time.Millisecond,
   154  					Max:        60000 * time.Millisecond,
   155  					Multiplier: 1.30,
   156  				},
   157  					http.StatusServiceUnavailable)
   158  			}),
   159  		},
   160  		BatchCreateReadSessionStreams: []gax.CallOption{
   161  			gax.WithTimeout(600000 * time.Millisecond),
   162  			gax.WithRetry(func() gax.Retryer {
   163  				return gax.OnHTTPCodes(gax.Backoff{
   164  					Initial:    100 * time.Millisecond,
   165  					Max:        60000 * time.Millisecond,
   166  					Multiplier: 1.30,
   167  				},
   168  					http.StatusGatewayTimeout,
   169  					http.StatusServiceUnavailable)
   170  			}),
   171  		},
   172  		FinalizeStream: []gax.CallOption{
   173  			gax.WithTimeout(600000 * time.Millisecond),
   174  			gax.WithRetry(func() gax.Retryer {
   175  				return gax.OnHTTPCodes(gax.Backoff{
   176  					Initial:    100 * time.Millisecond,
   177  					Max:        60000 * time.Millisecond,
   178  					Multiplier: 1.30,
   179  				},
   180  					http.StatusGatewayTimeout,
   181  					http.StatusServiceUnavailable)
   182  			}),
   183  		},
   184  		SplitReadStream: []gax.CallOption{
   185  			gax.WithTimeout(600000 * time.Millisecond),
   186  			gax.WithRetry(func() gax.Retryer {
   187  				return gax.OnHTTPCodes(gax.Backoff{
   188  					Initial:    100 * time.Millisecond,
   189  					Max:        60000 * time.Millisecond,
   190  					Multiplier: 1.30,
   191  				},
   192  					http.StatusGatewayTimeout,
   193  					http.StatusServiceUnavailable)
   194  			}),
   195  		},
   196  	}
   197  }
   198  
   199  // internalBigQueryStorageClient is an interface that defines the methods available from BigQuery Storage API.
   200  type internalBigQueryStorageClient interface {
   201  	Close() error
   202  	setGoogleClientInfo(...string)
   203  	Connection() *grpc.ClientConn
   204  	CreateReadSession(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
   205  	ReadRows(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error)
   206  	BatchCreateReadSessionStreams(context.Context, *storagepb.BatchCreateReadSessionStreamsRequest, ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error)
   207  	FinalizeStream(context.Context, *storagepb.FinalizeStreamRequest, ...gax.CallOption) error
   208  	SplitReadStream(context.Context, *storagepb.SplitReadStreamRequest, ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error)
   209  }
   210  
   211  // BigQueryStorageClient is a client for interacting with BigQuery Storage API.
   212  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   213  //
   214  // BigQuery storage API.
   215  //
   216  // The BigQuery storage API can be used to read data stored in BigQuery.
   217  //
   218  // The v1beta1 API is not yet officially deprecated, and will go through a full
   219  // deprecation cycle (https://cloud.google.com/products#product-launch-stages (at https://cloud.google.com/products#product-launch-stages))
   220  // before the service is turned down. However, new code should use the v1 API
   221  // going forward.
   222  type BigQueryStorageClient struct {
   223  	// The internal transport-dependent client.
   224  	internalClient internalBigQueryStorageClient
   225  
   226  	// The call options for this service.
   227  	CallOptions *BigQueryStorageCallOptions
   228  }
   229  
   230  // Wrapper methods routed to the internal client.
   231  
   232  // Close closes the connection to the API service. The user should invoke this when
   233  // the client is no longer required.
   234  func (c *BigQueryStorageClient) Close() error {
   235  	return c.internalClient.Close()
   236  }
   237  
   238  // setGoogleClientInfo sets the name and version of the application in
   239  // the `x-goog-api-client` header passed on each request. Intended for
   240  // use by Google-written clients.
   241  func (c *BigQueryStorageClient) setGoogleClientInfo(keyval ...string) {
   242  	c.internalClient.setGoogleClientInfo(keyval...)
   243  }
   244  
   245  // Connection returns a connection to the API service.
   246  //
   247  // Deprecated: Connections are now pooled so this method does not always
   248  // return the same resource.
   249  func (c *BigQueryStorageClient) Connection() *grpc.ClientConn {
   250  	return c.internalClient.Connection()
   251  }
   252  
   253  // CreateReadSession creates a new read session. A read session divides the contents of a
   254  // BigQuery table into one or more streams, which can then be used to read
   255  // data from the table. The read session also specifies properties of the
   256  // data to be read, such as a list of columns or a push-down filter describing
   257  // the rows to be returned.
   258  //
   259  // A particular row can be read by at most one stream. When the caller has
   260  // reached the end of each stream in the session, then all the data in the
   261  // table has been read.
   262  //
   263  // Read sessions automatically expire 6 hours after they are created and do
   264  // not require manual clean-up by the caller.
   265  func (c *BigQueryStorageClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   266  	return c.internalClient.CreateReadSession(ctx, req, opts...)
   267  }
   268  
   269  // ReadRows reads rows from the table in the format prescribed by the read session.
   270  // Each response contains one or more table rows, up to a maximum of 10 MiB
   271  // per response; read requests which attempt to read individual rows larger
   272  // than this will fail.
   273  //
   274  // Each request also returns a set of stream statistics reflecting the
   275  // estimated total number of rows in the read stream. This number is computed
   276  // based on the total table size and the number of active streams in the read
   277  // session, and may change as other streams continue to read data.
   278  func (c *BigQueryStorageClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error) {
   279  	return c.internalClient.ReadRows(ctx, req, opts...)
   280  }
   281  
   282  // BatchCreateReadSessionStreams creates additional streams for a ReadSession. This API can be used to
   283  // dynamically adjust the parallelism of a batch processing task upwards by
   284  // adding additional workers.
   285  func (c *BigQueryStorageClient) BatchCreateReadSessionStreams(ctx context.Context, req *storagepb.BatchCreateReadSessionStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error) {
   286  	return c.internalClient.BatchCreateReadSessionStreams(ctx, req, opts...)
   287  }
   288  
   289  // FinalizeStream causes a single stream in a ReadSession to gracefully stop. This
   290  // API can be used to dynamically adjust the parallelism of a batch processing
   291  // task downwards without losing data.
   292  //
   293  // This API does not delete the stream – it remains visible in the
   294  // ReadSession, and any data processed by the stream is not released to other
   295  // streams. However, no additional data will be assigned to the stream once
   296  // this call completes. Callers must continue reading data on the stream until
   297  // the end of the stream is reached so that data which has already been
   298  // assigned to the stream will be processed.
   299  //
   300  // This method will return an error if there are no other live streams
   301  // in the Session, or if SplitReadStream() has been called on the given
   302  // Stream.
   303  func (c *BigQueryStorageClient) FinalizeStream(ctx context.Context, req *storagepb.FinalizeStreamRequest, opts ...gax.CallOption) error {
   304  	return c.internalClient.FinalizeStream(ctx, req, opts...)
   305  }
   306  
   307  // SplitReadStream splits a given read stream into two Streams. These streams are referred to
   308  // as the primary and the residual of the split. The original stream can still
   309  // be read from in the same manner as before. Both of the returned streams can
   310  // also be read from, and the total rows return by both child streams will be
   311  // the same as the rows read from the original stream.
   312  //
   313  // Moreover, the two child streams will be allocated back to back in the
   314  // original Stream. Concretely, it is guaranteed that for streams Original,
   315  // Primary, and Residual, that Original[0-j] = Primary[0-j] and
   316  // Original[j-n] = Residual[0-m] once the streams have been read to
   317  // completion.
   318  //
   319  // This method is guaranteed to be idempotent.
   320  func (c *BigQueryStorageClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   321  	return c.internalClient.SplitReadStream(ctx, req, opts...)
   322  }
   323  
   324  // bigQueryStorageGRPCClient is a client for interacting with BigQuery Storage API over gRPC transport.
   325  //
   326  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   327  type bigQueryStorageGRPCClient struct {
   328  	// Connection pool of gRPC connections to the service.
   329  	connPool gtransport.ConnPool
   330  
   331  	// Points back to the CallOptions field of the containing BigQueryStorageClient
   332  	CallOptions **BigQueryStorageCallOptions
   333  
   334  	// The gRPC API client.
   335  	bigQueryStorageClient storagepb.BigQueryStorageClient
   336  
   337  	// The x-goog-* metadata to be sent with each request.
   338  	xGoogHeaders []string
   339  }
   340  
   341  // NewBigQueryStorageClient creates a new big query storage client based on gRPC.
   342  // The returned client must be Closed when it is done being used to clean up its underlying connections.
   343  //
   344  // BigQuery storage API.
   345  //
   346  // The BigQuery storage API can be used to read data stored in BigQuery.
   347  //
   348  // The v1beta1 API is not yet officially deprecated, and will go through a full
   349  // deprecation cycle (https://cloud.google.com/products#product-launch-stages (at https://cloud.google.com/products#product-launch-stages))
   350  // before the service is turned down. However, new code should use the v1 API
   351  // going forward.
   352  func NewBigQueryStorageClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryStorageClient, error) {
   353  	clientOpts := defaultBigQueryStorageGRPCClientOptions()
   354  	if newBigQueryStorageClientHook != nil {
   355  		hookOpts, err := newBigQueryStorageClientHook(ctx, clientHookParams{})
   356  		if err != nil {
   357  			return nil, err
   358  		}
   359  		clientOpts = append(clientOpts, hookOpts...)
   360  	}
   361  
   362  	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
   363  	if err != nil {
   364  		return nil, err
   365  	}
   366  	client := BigQueryStorageClient{CallOptions: defaultBigQueryStorageCallOptions()}
   367  
   368  	c := &bigQueryStorageGRPCClient{
   369  		connPool:              connPool,
   370  		bigQueryStorageClient: storagepb.NewBigQueryStorageClient(connPool),
   371  		CallOptions:           &client.CallOptions,
   372  	}
   373  	c.setGoogleClientInfo()
   374  
   375  	client.internalClient = c
   376  
   377  	return &client, nil
   378  }
   379  
   380  // Connection returns a connection to the API service.
   381  //
   382  // Deprecated: Connections are now pooled so this method does not always
   383  // return the same resource.
   384  func (c *bigQueryStorageGRPCClient) Connection() *grpc.ClientConn {
   385  	return c.connPool.Conn()
   386  }
   387  
   388  // setGoogleClientInfo sets the name and version of the application in
   389  // the `x-goog-api-client` header passed on each request. Intended for
   390  // use by Google-written clients.
   391  func (c *bigQueryStorageGRPCClient) setGoogleClientInfo(keyval ...string) {
   392  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   393  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
   394  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   395  }
   396  
   397  // Close closes the connection to the API service. The user should invoke this when
   398  // the client is no longer required.
   399  func (c *bigQueryStorageGRPCClient) Close() error {
   400  	return c.connPool.Close()
   401  }
   402  
   403  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   404  type bigQueryStorageRESTClient struct {
   405  	// The http endpoint to connect to.
   406  	endpoint string
   407  
   408  	// The http client.
   409  	httpClient *http.Client
   410  
   411  	// The x-goog-* headers to be sent with each request.
   412  	xGoogHeaders []string
   413  
   414  	// Points back to the CallOptions field of the containing BigQueryStorageClient
   415  	CallOptions **BigQueryStorageCallOptions
   416  }
   417  
   418  // NewBigQueryStorageRESTClient creates a new big query storage rest client.
   419  //
   420  // BigQuery storage API.
   421  //
   422  // The BigQuery storage API can be used to read data stored in BigQuery.
   423  //
   424  // The v1beta1 API is not yet officially deprecated, and will go through a full
   425  // deprecation cycle (https://cloud.google.com/products#product-launch-stages (at https://cloud.google.com/products#product-launch-stages))
   426  // before the service is turned down. However, new code should use the v1 API
   427  // going forward.
   428  func NewBigQueryStorageRESTClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryStorageClient, error) {
   429  	clientOpts := append(defaultBigQueryStorageRESTClientOptions(), opts...)
   430  	httpClient, endpoint, err := httptransport.NewClient(ctx, clientOpts...)
   431  	if err != nil {
   432  		return nil, err
   433  	}
   434  
   435  	callOpts := defaultBigQueryStorageRESTCallOptions()
   436  	c := &bigQueryStorageRESTClient{
   437  		endpoint:    endpoint,
   438  		httpClient:  httpClient,
   439  		CallOptions: &callOpts,
   440  	}
   441  	c.setGoogleClientInfo()
   442  
   443  	return &BigQueryStorageClient{internalClient: c, CallOptions: callOpts}, nil
   444  }
   445  
   446  func defaultBigQueryStorageRESTClientOptions() []option.ClientOption {
   447  	return []option.ClientOption{
   448  		internaloption.WithDefaultEndpoint("https://bigquerystorage.googleapis.com"),
   449  		internaloption.WithDefaultEndpointTemplate("https://bigquerystorage.UNIVERSE_DOMAIN"),
   450  		internaloption.WithDefaultMTLSEndpoint("https://bigquerystorage.mtls.googleapis.com"),
   451  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
   452  		internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
   453  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
   454  	}
   455  }
   456  
   457  // setGoogleClientInfo sets the name and version of the application in
   458  // the `x-goog-api-client` header passed on each request. Intended for
   459  // use by Google-written clients.
   460  func (c *bigQueryStorageRESTClient) setGoogleClientInfo(keyval ...string) {
   461  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   462  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "rest", "UNKNOWN")
   463  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   464  }
   465  
   466  // Close closes the connection to the API service. The user should invoke this when
   467  // the client is no longer required.
   468  func (c *bigQueryStorageRESTClient) Close() error {
   469  	// Replace httpClient with nil to force cleanup.
   470  	c.httpClient = nil
   471  	return nil
   472  }
   473  
   474  // Connection returns a connection to the API service.
   475  //
   476  // Deprecated: This method always returns nil.
   477  func (c *bigQueryStorageRESTClient) Connection() *grpc.ClientConn {
   478  	return nil
   479  }
   480  func (c *bigQueryStorageGRPCClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   481  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v&%s=%v", "table_reference.project_id", url.QueryEscape(req.GetTableReference().GetProjectId()), "table_reference.dataset_id", url.QueryEscape(req.GetTableReference().GetDatasetId()))}
   482  
   483  	hds = append(c.xGoogHeaders, hds...)
   484  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   485  	opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
   486  	var resp *storagepb.ReadSession
   487  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   488  		var err error
   489  		resp, err = c.bigQueryStorageClient.CreateReadSession(ctx, req, settings.GRPC...)
   490  		return err
   491  	}, opts...)
   492  	if err != nil {
   493  		return nil, err
   494  	}
   495  	return resp, nil
   496  }
   497  
   498  func (c *bigQueryStorageGRPCClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error) {
   499  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_position.stream.name", url.QueryEscape(req.GetReadPosition().GetStream().GetName()))}
   500  
   501  	hds = append(c.xGoogHeaders, hds...)
   502  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   503  	opts = append((*c.CallOptions).ReadRows[0:len((*c.CallOptions).ReadRows):len((*c.CallOptions).ReadRows)], opts...)
   504  	var resp storagepb.BigQueryStorage_ReadRowsClient
   505  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   506  		var err error
   507  		resp, err = c.bigQueryStorageClient.ReadRows(ctx, req, settings.GRPC...)
   508  		return err
   509  	}, opts...)
   510  	if err != nil {
   511  		return nil, err
   512  	}
   513  	return resp, nil
   514  }
   515  
   516  func (c *bigQueryStorageGRPCClient) BatchCreateReadSessionStreams(ctx context.Context, req *storagepb.BatchCreateReadSessionStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error) {
   517  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "session.name", url.QueryEscape(req.GetSession().GetName()))}
   518  
   519  	hds = append(c.xGoogHeaders, hds...)
   520  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   521  	opts = append((*c.CallOptions).BatchCreateReadSessionStreams[0:len((*c.CallOptions).BatchCreateReadSessionStreams):len((*c.CallOptions).BatchCreateReadSessionStreams)], opts...)
   522  	var resp *storagepb.BatchCreateReadSessionStreamsResponse
   523  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   524  		var err error
   525  		resp, err = c.bigQueryStorageClient.BatchCreateReadSessionStreams(ctx, req, settings.GRPC...)
   526  		return err
   527  	}, opts...)
   528  	if err != nil {
   529  		return nil, err
   530  	}
   531  	return resp, nil
   532  }
   533  
   534  func (c *bigQueryStorageGRPCClient) FinalizeStream(ctx context.Context, req *storagepb.FinalizeStreamRequest, opts ...gax.CallOption) error {
   535  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "stream.name", url.QueryEscape(req.GetStream().GetName()))}
   536  
   537  	hds = append(c.xGoogHeaders, hds...)
   538  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   539  	opts = append((*c.CallOptions).FinalizeStream[0:len((*c.CallOptions).FinalizeStream):len((*c.CallOptions).FinalizeStream)], opts...)
   540  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   541  		var err error
   542  		_, err = c.bigQueryStorageClient.FinalizeStream(ctx, req, settings.GRPC...)
   543  		return err
   544  	}, opts...)
   545  	return err
   546  }
   547  
   548  func (c *bigQueryStorageGRPCClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   549  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "original_stream.name", url.QueryEscape(req.GetOriginalStream().GetName()))}
   550  
   551  	hds = append(c.xGoogHeaders, hds...)
   552  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   553  	opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
   554  	var resp *storagepb.SplitReadStreamResponse
   555  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   556  		var err error
   557  		resp, err = c.bigQueryStorageClient.SplitReadStream(ctx, req, settings.GRPC...)
   558  		return err
   559  	}, opts...)
   560  	if err != nil {
   561  		return nil, err
   562  	}
   563  	return resp, nil
   564  }
   565  
   566  // CreateReadSession creates a new read session. A read session divides the contents of a
   567  // BigQuery table into one or more streams, which can then be used to read
   568  // data from the table. The read session also specifies properties of the
   569  // data to be read, such as a list of columns or a push-down filter describing
   570  // the rows to be returned.
   571  //
   572  // A particular row can be read by at most one stream. When the caller has
   573  // reached the end of each stream in the session, then all the data in the
   574  // table has been read.
   575  //
   576  // Read sessions automatically expire 6 hours after they are created and do
   577  // not require manual clean-up by the caller.
   578  func (c *bigQueryStorageRESTClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   579  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
   580  	jsonReq, err := m.Marshal(req)
   581  	if err != nil {
   582  		return nil, err
   583  	}
   584  
   585  	baseUrl, err := url.Parse(c.endpoint)
   586  	if err != nil {
   587  		return nil, err
   588  	}
   589  	baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetTableReference().GetProjectId())
   590  
   591  	// Build HTTP headers from client and context metadata.
   592  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v&%s=%v", "table_reference.project_id", url.QueryEscape(req.GetTableReference().GetProjectId()), "table_reference.dataset_id", url.QueryEscape(req.GetTableReference().GetDatasetId()))}
   593  
   594  	hds = append(c.xGoogHeaders, hds...)
   595  	hds = append(hds, "Content-Type", "application/json")
   596  	headers := gax.BuildHeaders(ctx, hds...)
   597  	opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
   598  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
   599  	resp := &storagepb.ReadSession{}
   600  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   601  		if settings.Path != "" {
   602  			baseUrl.Path = settings.Path
   603  		}
   604  		httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
   605  		if err != nil {
   606  			return err
   607  		}
   608  		httpReq = httpReq.WithContext(ctx)
   609  		httpReq.Header = headers
   610  
   611  		httpRsp, err := c.httpClient.Do(httpReq)
   612  		if err != nil {
   613  			return err
   614  		}
   615  		defer httpRsp.Body.Close()
   616  
   617  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   618  			return err
   619  		}
   620  
   621  		buf, err := io.ReadAll(httpRsp.Body)
   622  		if err != nil {
   623  			return err
   624  		}
   625  
   626  		if err := unm.Unmarshal(buf, resp); err != nil {
   627  			return err
   628  		}
   629  
   630  		return nil
   631  	}, opts...)
   632  	if e != nil {
   633  		return nil, e
   634  	}
   635  	return resp, nil
   636  }
   637  
   638  // ReadRows reads rows from the table in the format prescribed by the read session.
   639  // Each response contains one or more table rows, up to a maximum of 10 MiB
   640  // per response; read requests which attempt to read individual rows larger
   641  // than this will fail.
   642  //
   643  // Each request also returns a set of stream statistics reflecting the
   644  // estimated total number of rows in the read stream. This number is computed
   645  // based on the total table size and the number of active streams in the read
   646  // session, and may change as other streams continue to read data.
   647  func (c *bigQueryStorageRESTClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryStorage_ReadRowsClient, error) {
   648  	baseUrl, err := url.Parse(c.endpoint)
   649  	if err != nil {
   650  		return nil, err
   651  	}
   652  	baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetReadPosition().GetStream().GetName())
   653  
   654  	params := url.Values{}
   655  	if req.GetReadPosition().GetOffset() != 0 {
   656  		params.Add("readPosition.offset", fmt.Sprintf("%v", req.GetReadPosition().GetOffset()))
   657  	}
   658  
   659  	baseUrl.RawQuery = params.Encode()
   660  
   661  	// Build HTTP headers from client and context metadata.
   662  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_position.stream.name", url.QueryEscape(req.GetReadPosition().GetStream().GetName()))}
   663  
   664  	hds = append(c.xGoogHeaders, hds...)
   665  	hds = append(hds, "Content-Type", "application/json")
   666  	headers := gax.BuildHeaders(ctx, hds...)
   667  	var streamClient *readRowsRESTClient
   668  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   669  		if settings.Path != "" {
   670  			baseUrl.Path = settings.Path
   671  		}
   672  		httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
   673  		if err != nil {
   674  			return err
   675  		}
   676  		httpReq = httpReq.WithContext(ctx)
   677  		httpReq.Header = headers
   678  
   679  		httpRsp, err := c.httpClient.Do(httpReq)
   680  		if err != nil {
   681  			return err
   682  		}
   683  
   684  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   685  			return err
   686  		}
   687  
   688  		streamClient = &readRowsRESTClient{
   689  			ctx:    ctx,
   690  			md:     metadata.MD(httpRsp.Header),
   691  			stream: gax.NewProtoJSONStreamReader(httpRsp.Body, (&storagepb.ReadRowsResponse{}).ProtoReflect().Type()),
   692  		}
   693  		return nil
   694  	}, opts...)
   695  
   696  	return streamClient, e
   697  }
   698  
   699  // readRowsRESTClient is the stream client used to consume the server stream created by
   700  // the REST implementation of ReadRows.
   701  type readRowsRESTClient struct {
   702  	ctx    context.Context
   703  	md     metadata.MD
   704  	stream *gax.ProtoJSONStream
   705  }
   706  
   707  func (c *readRowsRESTClient) Recv() (*storagepb.ReadRowsResponse, error) {
   708  	if err := c.ctx.Err(); err != nil {
   709  		defer c.stream.Close()
   710  		return nil, err
   711  	}
   712  	msg, err := c.stream.Recv()
   713  	if err != nil {
   714  		defer c.stream.Close()
   715  		return nil, err
   716  	}
   717  	res := msg.(*storagepb.ReadRowsResponse)
   718  	return res, nil
   719  }
   720  
   721  func (c *readRowsRESTClient) Header() (metadata.MD, error) {
   722  	return c.md, nil
   723  }
   724  
   725  func (c *readRowsRESTClient) Trailer() metadata.MD {
   726  	return c.md
   727  }
   728  
   729  func (c *readRowsRESTClient) CloseSend() error {
   730  	// This is a no-op to fulfill the interface.
   731  	return fmt.Errorf("this method is not implemented for a server-stream")
   732  }
   733  
   734  func (c *readRowsRESTClient) Context() context.Context {
   735  	return c.ctx
   736  }
   737  
   738  func (c *readRowsRESTClient) SendMsg(m interface{}) error {
   739  	// This is a no-op to fulfill the interface.
   740  	return fmt.Errorf("this method is not implemented for a server-stream")
   741  }
   742  
   743  func (c *readRowsRESTClient) RecvMsg(m interface{}) error {
   744  	// This is a no-op to fulfill the interface.
   745  	return fmt.Errorf("this method is not implemented, use Recv")
   746  }
   747  
   748  // BatchCreateReadSessionStreams creates additional streams for a ReadSession. This API can be used to
   749  // dynamically adjust the parallelism of a batch processing task upwards by
   750  // adding additional workers.
   751  func (c *bigQueryStorageRESTClient) BatchCreateReadSessionStreams(ctx context.Context, req *storagepb.BatchCreateReadSessionStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCreateReadSessionStreamsResponse, error) {
   752  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
   753  	jsonReq, err := m.Marshal(req)
   754  	if err != nil {
   755  		return nil, err
   756  	}
   757  
   758  	baseUrl, err := url.Parse(c.endpoint)
   759  	if err != nil {
   760  		return nil, err
   761  	}
   762  	baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetSession().GetName())
   763  
   764  	// Build HTTP headers from client and context metadata.
   765  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "session.name", url.QueryEscape(req.GetSession().GetName()))}
   766  
   767  	hds = append(c.xGoogHeaders, hds...)
   768  	hds = append(hds, "Content-Type", "application/json")
   769  	headers := gax.BuildHeaders(ctx, hds...)
   770  	opts = append((*c.CallOptions).BatchCreateReadSessionStreams[0:len((*c.CallOptions).BatchCreateReadSessionStreams):len((*c.CallOptions).BatchCreateReadSessionStreams)], opts...)
   771  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
   772  	resp := &storagepb.BatchCreateReadSessionStreamsResponse{}
   773  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   774  		if settings.Path != "" {
   775  			baseUrl.Path = settings.Path
   776  		}
   777  		httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
   778  		if err != nil {
   779  			return err
   780  		}
   781  		httpReq = httpReq.WithContext(ctx)
   782  		httpReq.Header = headers
   783  
   784  		httpRsp, err := c.httpClient.Do(httpReq)
   785  		if err != nil {
   786  			return err
   787  		}
   788  		defer httpRsp.Body.Close()
   789  
   790  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   791  			return err
   792  		}
   793  
   794  		buf, err := io.ReadAll(httpRsp.Body)
   795  		if err != nil {
   796  			return err
   797  		}
   798  
   799  		if err := unm.Unmarshal(buf, resp); err != nil {
   800  			return err
   801  		}
   802  
   803  		return nil
   804  	}, opts...)
   805  	if e != nil {
   806  		return nil, e
   807  	}
   808  	return resp, nil
   809  }
   810  
   811  // FinalizeStream causes a single stream in a ReadSession to gracefully stop. This
   812  // API can be used to dynamically adjust the parallelism of a batch processing
   813  // task downwards without losing data.
   814  //
   815  // This API does not delete the stream – it remains visible in the
   816  // ReadSession, and any data processed by the stream is not released to other
   817  // streams. However, no additional data will be assigned to the stream once
   818  // this call completes. Callers must continue reading data on the stream until
   819  // the end of the stream is reached so that data which has already been
   820  // assigned to the stream will be processed.
   821  //
   822  // This method will return an error if there are no other live streams
   823  // in the Session, or if SplitReadStream() has been called on the given
   824  // Stream.
   825  func (c *bigQueryStorageRESTClient) FinalizeStream(ctx context.Context, req *storagepb.FinalizeStreamRequest, opts ...gax.CallOption) error {
   826  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
   827  	jsonReq, err := m.Marshal(req)
   828  	if err != nil {
   829  		return err
   830  	}
   831  
   832  	baseUrl, err := url.Parse(c.endpoint)
   833  	if err != nil {
   834  		return err
   835  	}
   836  	baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetStream().GetName())
   837  
   838  	// Build HTTP headers from client and context metadata.
   839  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "stream.name", url.QueryEscape(req.GetStream().GetName()))}
   840  
   841  	hds = append(c.xGoogHeaders, hds...)
   842  	hds = append(hds, "Content-Type", "application/json")
   843  	headers := gax.BuildHeaders(ctx, hds...)
   844  	return gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   845  		if settings.Path != "" {
   846  			baseUrl.Path = settings.Path
   847  		}
   848  		httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
   849  		if err != nil {
   850  			return err
   851  		}
   852  		httpReq = httpReq.WithContext(ctx)
   853  		httpReq.Header = headers
   854  
   855  		httpRsp, err := c.httpClient.Do(httpReq)
   856  		if err != nil {
   857  			return err
   858  		}
   859  		defer httpRsp.Body.Close()
   860  
   861  		// Returns nil if there is no error, otherwise wraps
   862  		// the response code and body into a non-nil error
   863  		return googleapi.CheckResponse(httpRsp)
   864  	}, opts...)
   865  }
   866  
   867  // SplitReadStream splits a given read stream into two Streams. These streams are referred to
   868  // as the primary and the residual of the split. The original stream can still
   869  // be read from in the same manner as before. Both of the returned streams can
   870  // also be read from, and the total rows return by both child streams will be
   871  // the same as the rows read from the original stream.
   872  //
   873  // Moreover, the two child streams will be allocated back to back in the
   874  // original Stream. Concretely, it is guaranteed that for streams Original,
   875  // Primary, and Residual, that Original[0-j] = Primary[0-j] and
   876  // Original[j-n] = Residual[0-m] once the streams have been read to
   877  // completion.
   878  //
   879  // This method is guaranteed to be idempotent.
   880  func (c *bigQueryStorageRESTClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   881  	baseUrl, err := url.Parse(c.endpoint)
   882  	if err != nil {
   883  		return nil, err
   884  	}
   885  	baseUrl.Path += fmt.Sprintf("/v1beta1/%v", req.GetOriginalStream().GetName())
   886  
   887  	params := url.Values{}
   888  	if req.GetFraction() != 0 {
   889  		params.Add("fraction", fmt.Sprintf("%v", req.GetFraction()))
   890  	}
   891  
   892  	baseUrl.RawQuery = params.Encode()
   893  
   894  	// Build HTTP headers from client and context metadata.
   895  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "original_stream.name", url.QueryEscape(req.GetOriginalStream().GetName()))}
   896  
   897  	hds = append(c.xGoogHeaders, hds...)
   898  	hds = append(hds, "Content-Type", "application/json")
   899  	headers := gax.BuildHeaders(ctx, hds...)
   900  	opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
   901  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
   902  	resp := &storagepb.SplitReadStreamResponse{}
   903  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   904  		if settings.Path != "" {
   905  			baseUrl.Path = settings.Path
   906  		}
   907  		httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
   908  		if err != nil {
   909  			return err
   910  		}
   911  		httpReq = httpReq.WithContext(ctx)
   912  		httpReq.Header = headers
   913  
   914  		httpRsp, err := c.httpClient.Do(httpReq)
   915  		if err != nil {
   916  			return err
   917  		}
   918  		defer httpRsp.Body.Close()
   919  
   920  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   921  			return err
   922  		}
   923  
   924  		buf, err := io.ReadAll(httpRsp.Body)
   925  		if err != nil {
   926  			return err
   927  		}
   928  
   929  		if err := unm.Unmarshal(buf, resp); err != nil {
   930  			return err
   931  		}
   932  
   933  		return nil
   934  	}, opts...)
   935  	if e != nil {
   936  		return nil, e
   937  	}
   938  	return resp, nil
   939  }
   940  

View as plain text