...

Source file src/cloud.google.com/go/bigquery/storage/apiv1beta2/big_query_read_client.go

Documentation: cloud.google.com/go/bigquery/storage/apiv1beta2

     1  // Copyright 2024 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Code generated by protoc-gen-go_gapic. DO NOT EDIT.
    16  
    17  package storage
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"math"
    25  	"net/http"
    26  	"net/url"
    27  	"time"
    28  
    29  	storagepb "cloud.google.com/go/bigquery/storage/apiv1beta2/storagepb"
    30  	gax "github.com/googleapis/gax-go/v2"
    31  	"google.golang.org/api/googleapi"
    32  	"google.golang.org/api/option"
    33  	"google.golang.org/api/option/internaloption"
    34  	gtransport "google.golang.org/api/transport/grpc"
    35  	httptransport "google.golang.org/api/transport/http"
    36  	"google.golang.org/grpc"
    37  	"google.golang.org/grpc/codes"
    38  	"google.golang.org/grpc/metadata"
    39  	"google.golang.org/protobuf/encoding/protojson"
    40  )
    41  
    42  var newBigQueryReadClientHook clientHook
    43  
    44  // BigQueryReadCallOptions contains the retry settings for each method of BigQueryReadClient.
    45  type BigQueryReadCallOptions struct {
    46  	CreateReadSession []gax.CallOption
    47  	ReadRows          []gax.CallOption
    48  	SplitReadStream   []gax.CallOption
    49  }
    50  
    51  func defaultBigQueryReadGRPCClientOptions() []option.ClientOption {
    52  	return []option.ClientOption{
    53  		internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
    54  		internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
    55  		internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
    56  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
    57  		internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
    58  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
    59  		internaloption.EnableJwtWithScope(),
    60  		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
    61  			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
    62  	}
    63  }
    64  
    65  func defaultBigQueryReadCallOptions() *BigQueryReadCallOptions {
    66  	return &BigQueryReadCallOptions{
    67  		CreateReadSession: []gax.CallOption{
    68  			gax.WithTimeout(600000 * time.Millisecond),
    69  			gax.WithRetry(func() gax.Retryer {
    70  				return gax.OnCodes([]codes.Code{
    71  					codes.DeadlineExceeded,
    72  					codes.Unavailable,
    73  				}, gax.Backoff{
    74  					Initial:    100 * time.Millisecond,
    75  					Max:        60000 * time.Millisecond,
    76  					Multiplier: 1.30,
    77  				})
    78  			}),
    79  		},
    80  		ReadRows: []gax.CallOption{
    81  			gax.WithRetry(func() gax.Retryer {
    82  				return gax.OnCodes([]codes.Code{
    83  					codes.Unavailable,
    84  				}, gax.Backoff{
    85  					Initial:    100 * time.Millisecond,
    86  					Max:        60000 * time.Millisecond,
    87  					Multiplier: 1.30,
    88  				})
    89  			}),
    90  		},
    91  		SplitReadStream: []gax.CallOption{
    92  			gax.WithTimeout(600000 * time.Millisecond),
    93  			gax.WithRetry(func() gax.Retryer {
    94  				return gax.OnCodes([]codes.Code{
    95  					codes.DeadlineExceeded,
    96  					codes.Unavailable,
    97  				}, gax.Backoff{
    98  					Initial:    100 * time.Millisecond,
    99  					Max:        60000 * time.Millisecond,
   100  					Multiplier: 1.30,
   101  				})
   102  			}),
   103  		},
   104  	}
   105  }
   106  
   107  func defaultBigQueryReadRESTCallOptions() *BigQueryReadCallOptions {
   108  	return &BigQueryReadCallOptions{
   109  		CreateReadSession: []gax.CallOption{
   110  			gax.WithTimeout(600000 * time.Millisecond),
   111  			gax.WithRetry(func() gax.Retryer {
   112  				return gax.OnHTTPCodes(gax.Backoff{
   113  					Initial:    100 * time.Millisecond,
   114  					Max:        60000 * time.Millisecond,
   115  					Multiplier: 1.30,
   116  				},
   117  					http.StatusGatewayTimeout,
   118  					http.StatusServiceUnavailable)
   119  			}),
   120  		},
   121  		ReadRows: []gax.CallOption{
   122  			gax.WithTimeout(86400000 * time.Millisecond),
   123  			gax.WithRetry(func() gax.Retryer {
   124  				return gax.OnHTTPCodes(gax.Backoff{
   125  					Initial:    100 * time.Millisecond,
   126  					Max:        60000 * time.Millisecond,
   127  					Multiplier: 1.30,
   128  				},
   129  					http.StatusServiceUnavailable)
   130  			}),
   131  		},
   132  		SplitReadStream: []gax.CallOption{
   133  			gax.WithTimeout(600000 * time.Millisecond),
   134  			gax.WithRetry(func() gax.Retryer {
   135  				return gax.OnHTTPCodes(gax.Backoff{
   136  					Initial:    100 * time.Millisecond,
   137  					Max:        60000 * time.Millisecond,
   138  					Multiplier: 1.30,
   139  				},
   140  					http.StatusGatewayTimeout,
   141  					http.StatusServiceUnavailable)
   142  			}),
   143  		},
   144  	}
   145  }
   146  
   147  // internalBigQueryReadClient is an interface that defines the methods available from BigQuery Storage API.
   148  type internalBigQueryReadClient interface {
   149  	Close() error
   150  	setGoogleClientInfo(...string)
   151  	Connection() *grpc.ClientConn
   152  	CreateReadSession(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
   153  	ReadRows(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error)
   154  	SplitReadStream(context.Context, *storagepb.SplitReadStreamRequest, ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error)
   155  }
   156  
   157  // BigQueryReadClient is a client for interacting with BigQuery Storage API.
   158  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   159  //
   160  // BigQuery Read API.
   161  //
   162  // The Read API can be used to read data from BigQuery.
   163  //
   164  // New code should use the v1 Read API going forward, if they don’t use Write
   165  // API at the same time.
   166  type BigQueryReadClient struct {
   167  	// The internal transport-dependent client.
   168  	internalClient internalBigQueryReadClient
   169  
   170  	// The call options for this service.
   171  	CallOptions *BigQueryReadCallOptions
   172  }
   173  
   174  // Wrapper methods routed to the internal client.
   175  
   176  // Close closes the connection to the API service. The user should invoke this when
   177  // the client is no longer required.
   178  func (c *BigQueryReadClient) Close() error {
   179  	return c.internalClient.Close()
   180  }
   181  
   182  // setGoogleClientInfo sets the name and version of the application in
   183  // the `x-goog-api-client` header passed on each request. Intended for
   184  // use by Google-written clients.
   185  func (c *BigQueryReadClient) setGoogleClientInfo(keyval ...string) {
   186  	c.internalClient.setGoogleClientInfo(keyval...)
   187  }
   188  
   189  // Connection returns a connection to the API service.
   190  //
   191  // Deprecated: Connections are now pooled so this method does not always
   192  // return the same resource.
   193  func (c *BigQueryReadClient) Connection() *grpc.ClientConn {
   194  	return c.internalClient.Connection()
   195  }
   196  
   197  // CreateReadSession creates a new read session. A read session divides the contents of a
   198  // BigQuery table into one or more streams, which can then be used to read
   199  // data from the table. The read session also specifies properties of the
   200  // data to be read, such as a list of columns or a push-down filter describing
   201  // the rows to be returned.
   202  //
   203  // A particular row can be read by at most one stream. When the caller has
   204  // reached the end of each stream in the session, then all the data in the
   205  // table has been read.
   206  //
   207  // Data is assigned to each stream such that roughly the same number of
   208  // rows can be read from each stream. Because the server-side unit for
   209  // assigning data is collections of rows, the API does not guarantee that
   210  // each stream will return the same number or rows. Additionally, the
   211  // limits are enforced based on the number of pre-filtered rows, so some
   212  // filters can lead to lopsided assignments.
   213  //
   214  // Read sessions automatically expire 6 hours after they are created and do
   215  // not require manual clean-up by the caller.
   216  func (c *BigQueryReadClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   217  	return c.internalClient.CreateReadSession(ctx, req, opts...)
   218  }
   219  
   220  // ReadRows reads rows from the stream in the format prescribed by the ReadSession.
   221  // Each response contains one or more table rows, up to a maximum of 100 MiB
   222  // per response; read requests which attempt to read individual rows larger
   223  // than 100 MiB will fail.
   224  //
   225  // Each request also returns a set of stream statistics reflecting the current
   226  // state of the stream.
   227  func (c *BigQueryReadClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
   228  	return c.internalClient.ReadRows(ctx, req, opts...)
   229  }
   230  
   231  // SplitReadStream splits a given ReadStream into two ReadStream objects. These
   232  // ReadStream objects are referred to as the primary and the residual
   233  // streams of the split. The original ReadStream can still be read from in
   234  // the same manner as before. Both of the returned ReadStream objects can
   235  // also be read from, and the rows returned by both child streams will be
   236  // the same as the rows read from the original stream.
   237  //
   238  // Moreover, the two child streams will be allocated back-to-back in the
   239  // original ReadStream. Concretely, it is guaranteed that for streams
   240  // original, primary, and residual, that original[0-j] = primary[0-j] and
   241  // original[j-n] = residual[0-m] once the streams have been read to
   242  // completion.
   243  func (c *BigQueryReadClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   244  	return c.internalClient.SplitReadStream(ctx, req, opts...)
   245  }
   246  
   247  // bigQueryReadGRPCClient is a client for interacting with BigQuery Storage API over gRPC transport.
   248  //
   249  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   250  type bigQueryReadGRPCClient struct {
   251  	// Connection pool of gRPC connections to the service.
   252  	connPool gtransport.ConnPool
   253  
   254  	// Points back to the CallOptions field of the containing BigQueryReadClient
   255  	CallOptions **BigQueryReadCallOptions
   256  
   257  	// The gRPC API client.
   258  	bigQueryReadClient storagepb.BigQueryReadClient
   259  
   260  	// The x-goog-* metadata to be sent with each request.
   261  	xGoogHeaders []string
   262  }
   263  
   264  // NewBigQueryReadClient creates a new big query read client based on gRPC.
   265  // The returned client must be Closed when it is done being used to clean up its underlying connections.
   266  //
   267  // BigQuery Read API.
   268  //
   269  // The Read API can be used to read data from BigQuery.
   270  //
   271  // New code should use the v1 Read API going forward, if they don’t use Write
   272  // API at the same time.
   273  func NewBigQueryReadClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryReadClient, error) {
   274  	clientOpts := defaultBigQueryReadGRPCClientOptions()
   275  	if newBigQueryReadClientHook != nil {
   276  		hookOpts, err := newBigQueryReadClientHook(ctx, clientHookParams{})
   277  		if err != nil {
   278  			return nil, err
   279  		}
   280  		clientOpts = append(clientOpts, hookOpts...)
   281  	}
   282  
   283  	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
   284  	if err != nil {
   285  		return nil, err
   286  	}
   287  	client := BigQueryReadClient{CallOptions: defaultBigQueryReadCallOptions()}
   288  
   289  	c := &bigQueryReadGRPCClient{
   290  		connPool:           connPool,
   291  		bigQueryReadClient: storagepb.NewBigQueryReadClient(connPool),
   292  		CallOptions:        &client.CallOptions,
   293  	}
   294  	c.setGoogleClientInfo()
   295  
   296  	client.internalClient = c
   297  
   298  	return &client, nil
   299  }
   300  
   301  // Connection returns a connection to the API service.
   302  //
   303  // Deprecated: Connections are now pooled so this method does not always
   304  // return the same resource.
   305  func (c *bigQueryReadGRPCClient) Connection() *grpc.ClientConn {
   306  	return c.connPool.Conn()
   307  }
   308  
   309  // setGoogleClientInfo sets the name and version of the application in
   310  // the `x-goog-api-client` header passed on each request. Intended for
   311  // use by Google-written clients.
   312  func (c *bigQueryReadGRPCClient) setGoogleClientInfo(keyval ...string) {
   313  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   314  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
   315  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   316  }
   317  
   318  // Close closes the connection to the API service. The user should invoke this when
   319  // the client is no longer required.
   320  func (c *bigQueryReadGRPCClient) Close() error {
   321  	return c.connPool.Close()
   322  }
   323  
   324  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   325  type bigQueryReadRESTClient struct {
   326  	// The http endpoint to connect to.
   327  	endpoint string
   328  
   329  	// The http client.
   330  	httpClient *http.Client
   331  
   332  	// The x-goog-* headers to be sent with each request.
   333  	xGoogHeaders []string
   334  
   335  	// Points back to the CallOptions field of the containing BigQueryReadClient
   336  	CallOptions **BigQueryReadCallOptions
   337  }
   338  
   339  // NewBigQueryReadRESTClient creates a new big query read rest client.
   340  //
   341  // BigQuery Read API.
   342  //
   343  // The Read API can be used to read data from BigQuery.
   344  //
   345  // New code should use the v1 Read API going forward, if they don’t use Write
   346  // API at the same time.
   347  func NewBigQueryReadRESTClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryReadClient, error) {
   348  	clientOpts := append(defaultBigQueryReadRESTClientOptions(), opts...)
   349  	httpClient, endpoint, err := httptransport.NewClient(ctx, clientOpts...)
   350  	if err != nil {
   351  		return nil, err
   352  	}
   353  
   354  	callOpts := defaultBigQueryReadRESTCallOptions()
   355  	c := &bigQueryReadRESTClient{
   356  		endpoint:    endpoint,
   357  		httpClient:  httpClient,
   358  		CallOptions: &callOpts,
   359  	}
   360  	c.setGoogleClientInfo()
   361  
   362  	return &BigQueryReadClient{internalClient: c, CallOptions: callOpts}, nil
   363  }
   364  
   365  func defaultBigQueryReadRESTClientOptions() []option.ClientOption {
   366  	return []option.ClientOption{
   367  		internaloption.WithDefaultEndpoint("https://bigquerystorage.googleapis.com"),
   368  		internaloption.WithDefaultEndpointTemplate("https://bigquerystorage.UNIVERSE_DOMAIN"),
   369  		internaloption.WithDefaultMTLSEndpoint("https://bigquerystorage.mtls.googleapis.com"),
   370  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
   371  		internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
   372  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
   373  	}
   374  }
   375  
   376  // setGoogleClientInfo sets the name and version of the application in
   377  // the `x-goog-api-client` header passed on each request. Intended for
   378  // use by Google-written clients.
   379  func (c *bigQueryReadRESTClient) setGoogleClientInfo(keyval ...string) {
   380  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   381  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "rest", "UNKNOWN")
   382  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   383  }
   384  
   385  // Close closes the connection to the API service. The user should invoke this when
   386  // the client is no longer required.
   387  func (c *bigQueryReadRESTClient) Close() error {
   388  	// Replace httpClient with nil to force cleanup.
   389  	c.httpClient = nil
   390  	return nil
   391  }
   392  
   393  // Connection returns a connection to the API service.
   394  //
   395  // Deprecated: This method always returns nil.
   396  func (c *bigQueryReadRESTClient) Connection() *grpc.ClientConn {
   397  	return nil
   398  }
   399  func (c *bigQueryReadGRPCClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   400  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_session.table", url.QueryEscape(req.GetReadSession().GetTable()))}
   401  
   402  	hds = append(c.xGoogHeaders, hds...)
   403  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   404  	opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
   405  	var resp *storagepb.ReadSession
   406  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   407  		var err error
   408  		resp, err = c.bigQueryReadClient.CreateReadSession(ctx, req, settings.GRPC...)
   409  		return err
   410  	}, opts...)
   411  	if err != nil {
   412  		return nil, err
   413  	}
   414  	return resp, nil
   415  }
   416  
   417  func (c *bigQueryReadGRPCClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
   418  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_stream", url.QueryEscape(req.GetReadStream()))}
   419  
   420  	hds = append(c.xGoogHeaders, hds...)
   421  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   422  	opts = append((*c.CallOptions).ReadRows[0:len((*c.CallOptions).ReadRows):len((*c.CallOptions).ReadRows)], opts...)
   423  	var resp storagepb.BigQueryRead_ReadRowsClient
   424  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   425  		var err error
   426  		resp, err = c.bigQueryReadClient.ReadRows(ctx, req, settings.GRPC...)
   427  		return err
   428  	}, opts...)
   429  	if err != nil {
   430  		return nil, err
   431  	}
   432  	return resp, nil
   433  }
   434  
   435  func (c *bigQueryReadGRPCClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   436  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
   437  
   438  	hds = append(c.xGoogHeaders, hds...)
   439  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   440  	opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
   441  	var resp *storagepb.SplitReadStreamResponse
   442  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   443  		var err error
   444  		resp, err = c.bigQueryReadClient.SplitReadStream(ctx, req, settings.GRPC...)
   445  		return err
   446  	}, opts...)
   447  	if err != nil {
   448  		return nil, err
   449  	}
   450  	return resp, nil
   451  }
   452  
   453  // CreateReadSession creates a new read session. A read session divides the contents of a
   454  // BigQuery table into one or more streams, which can then be used to read
   455  // data from the table. The read session also specifies properties of the
   456  // data to be read, such as a list of columns or a push-down filter describing
   457  // the rows to be returned.
   458  //
   459  // A particular row can be read by at most one stream. When the caller has
   460  // reached the end of each stream in the session, then all the data in the
   461  // table has been read.
   462  //
   463  // Data is assigned to each stream such that roughly the same number of
   464  // rows can be read from each stream. Because the server-side unit for
   465  // assigning data is collections of rows, the API does not guarantee that
   466  // each stream will return the same number or rows. Additionally, the
   467  // limits are enforced based on the number of pre-filtered rows, so some
   468  // filters can lead to lopsided assignments.
   469  //
   470  // Read sessions automatically expire 6 hours after they are created and do
   471  // not require manual clean-up by the caller.
   472  func (c *bigQueryReadRESTClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   473  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
   474  	jsonReq, err := m.Marshal(req)
   475  	if err != nil {
   476  		return nil, err
   477  	}
   478  
   479  	baseUrl, err := url.Parse(c.endpoint)
   480  	if err != nil {
   481  		return nil, err
   482  	}
   483  	baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetReadSession().GetTable())
   484  
   485  	// Build HTTP headers from client and context metadata.
   486  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_session.table", url.QueryEscape(req.GetReadSession().GetTable()))}
   487  
   488  	hds = append(c.xGoogHeaders, hds...)
   489  	hds = append(hds, "Content-Type", "application/json")
   490  	headers := gax.BuildHeaders(ctx, hds...)
   491  	opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
   492  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
   493  	resp := &storagepb.ReadSession{}
   494  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   495  		if settings.Path != "" {
   496  			baseUrl.Path = settings.Path
   497  		}
   498  		httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
   499  		if err != nil {
   500  			return err
   501  		}
   502  		httpReq = httpReq.WithContext(ctx)
   503  		httpReq.Header = headers
   504  
   505  		httpRsp, err := c.httpClient.Do(httpReq)
   506  		if err != nil {
   507  			return err
   508  		}
   509  		defer httpRsp.Body.Close()
   510  
   511  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   512  			return err
   513  		}
   514  
   515  		buf, err := io.ReadAll(httpRsp.Body)
   516  		if err != nil {
   517  			return err
   518  		}
   519  
   520  		if err := unm.Unmarshal(buf, resp); err != nil {
   521  			return err
   522  		}
   523  
   524  		return nil
   525  	}, opts...)
   526  	if e != nil {
   527  		return nil, e
   528  	}
   529  	return resp, nil
   530  }
   531  
   532  // ReadRows reads rows from the stream in the format prescribed by the ReadSession.
   533  // Each response contains one or more table rows, up to a maximum of 100 MiB
   534  // per response; read requests which attempt to read individual rows larger
   535  // than 100 MiB will fail.
   536  //
   537  // Each request also returns a set of stream statistics reflecting the current
   538  // state of the stream.
   539  func (c *bigQueryReadRESTClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
   540  	baseUrl, err := url.Parse(c.endpoint)
   541  	if err != nil {
   542  		return nil, err
   543  	}
   544  	baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetReadStream())
   545  
   546  	params := url.Values{}
   547  	if req.GetOffset() != 0 {
   548  		params.Add("offset", fmt.Sprintf("%v", req.GetOffset()))
   549  	}
   550  
   551  	baseUrl.RawQuery = params.Encode()
   552  
   553  	// Build HTTP headers from client and context metadata.
   554  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_stream", url.QueryEscape(req.GetReadStream()))}
   555  
   556  	hds = append(c.xGoogHeaders, hds...)
   557  	hds = append(hds, "Content-Type", "application/json")
   558  	headers := gax.BuildHeaders(ctx, hds...)
   559  	var streamClient *readRowsRESTClient
   560  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   561  		if settings.Path != "" {
   562  			baseUrl.Path = settings.Path
   563  		}
   564  		httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
   565  		if err != nil {
   566  			return err
   567  		}
   568  		httpReq = httpReq.WithContext(ctx)
   569  		httpReq.Header = headers
   570  
   571  		httpRsp, err := c.httpClient.Do(httpReq)
   572  		if err != nil {
   573  			return err
   574  		}
   575  
   576  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   577  			return err
   578  		}
   579  
   580  		streamClient = &readRowsRESTClient{
   581  			ctx:    ctx,
   582  			md:     metadata.MD(httpRsp.Header),
   583  			stream: gax.NewProtoJSONStreamReader(httpRsp.Body, (&storagepb.ReadRowsResponse{}).ProtoReflect().Type()),
   584  		}
   585  		return nil
   586  	}, opts...)
   587  
   588  	return streamClient, e
   589  }
   590  
   591  // readRowsRESTClient is the stream client used to consume the server stream created by
   592  // the REST implementation of ReadRows.
   593  type readRowsRESTClient struct {
   594  	ctx    context.Context
   595  	md     metadata.MD
   596  	stream *gax.ProtoJSONStream
   597  }
   598  
   599  func (c *readRowsRESTClient) Recv() (*storagepb.ReadRowsResponse, error) {
   600  	if err := c.ctx.Err(); err != nil {
   601  		defer c.stream.Close()
   602  		return nil, err
   603  	}
   604  	msg, err := c.stream.Recv()
   605  	if err != nil {
   606  		defer c.stream.Close()
   607  		return nil, err
   608  	}
   609  	res := msg.(*storagepb.ReadRowsResponse)
   610  	return res, nil
   611  }
   612  
   613  func (c *readRowsRESTClient) Header() (metadata.MD, error) {
   614  	return c.md, nil
   615  }
   616  
   617  func (c *readRowsRESTClient) Trailer() metadata.MD {
   618  	return c.md
   619  }
   620  
   621  func (c *readRowsRESTClient) CloseSend() error {
   622  	// This is a no-op to fulfill the interface.
   623  	return fmt.Errorf("this method is not implemented for a server-stream")
   624  }
   625  
   626  func (c *readRowsRESTClient) Context() context.Context {
   627  	return c.ctx
   628  }
   629  
   630  func (c *readRowsRESTClient) SendMsg(m interface{}) error {
   631  	// This is a no-op to fulfill the interface.
   632  	return fmt.Errorf("this method is not implemented for a server-stream")
   633  }
   634  
   635  func (c *readRowsRESTClient) RecvMsg(m interface{}) error {
   636  	// This is a no-op to fulfill the interface.
   637  	return fmt.Errorf("this method is not implemented, use Recv")
   638  }
   639  
   640  // SplitReadStream splits a given ReadStream into two ReadStream objects. These
   641  // ReadStream objects are referred to as the primary and the residual
   642  // streams of the split. The original ReadStream can still be read from in
   643  // the same manner as before. Both of the returned ReadStream objects can
   644  // also be read from, and the rows returned by both child streams will be
   645  // the same as the rows read from the original stream.
   646  //
   647  // Moreover, the two child streams will be allocated back-to-back in the
   648  // original ReadStream. Concretely, it is guaranteed that for streams
   649  // original, primary, and residual, that original[0-j] = primary[0-j] and
   650  // original[j-n] = residual[0-m] once the streams have been read to
   651  // completion.
   652  func (c *bigQueryReadRESTClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   653  	baseUrl, err := url.Parse(c.endpoint)
   654  	if err != nil {
   655  		return nil, err
   656  	}
   657  	baseUrl.Path += fmt.Sprintf("/v1beta2/%v", req.GetName())
   658  
   659  	params := url.Values{}
   660  	if req.GetFraction() != 0 {
   661  		params.Add("fraction", fmt.Sprintf("%v", req.GetFraction()))
   662  	}
   663  
   664  	baseUrl.RawQuery = params.Encode()
   665  
   666  	// Build HTTP headers from client and context metadata.
   667  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
   668  
   669  	hds = append(c.xGoogHeaders, hds...)
   670  	hds = append(hds, "Content-Type", "application/json")
   671  	headers := gax.BuildHeaders(ctx, hds...)
   672  	opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
   673  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
   674  	resp := &storagepb.SplitReadStreamResponse{}
   675  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   676  		if settings.Path != "" {
   677  			baseUrl.Path = settings.Path
   678  		}
   679  		httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
   680  		if err != nil {
   681  			return err
   682  		}
   683  		httpReq = httpReq.WithContext(ctx)
   684  		httpReq.Header = headers
   685  
   686  		httpRsp, err := c.httpClient.Do(httpReq)
   687  		if err != nil {
   688  			return err
   689  		}
   690  		defer httpRsp.Body.Close()
   691  
   692  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   693  			return err
   694  		}
   695  
   696  		buf, err := io.ReadAll(httpRsp.Body)
   697  		if err != nil {
   698  			return err
   699  		}
   700  
   701  		if err := unm.Unmarshal(buf, resp); err != nil {
   702  			return err
   703  		}
   704  
   705  		return nil
   706  	}, opts...)
   707  	if e != nil {
   708  		return nil, e
   709  	}
   710  	return resp, nil
   711  }
   712  

View as plain text