...

Source file src/cloud.google.com/go/bigquery/storage/apiv1/big_query_read_client.go

Documentation: cloud.google.com/go/bigquery/storage/apiv1

     1  // Copyright 2024 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Code generated by protoc-gen-go_gapic. DO NOT EDIT.
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"net/url"
    24  	"time"
    25  
    26  	storagepb "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    27  	gax "github.com/googleapis/gax-go/v2"
    28  	"google.golang.org/api/option"
    29  	"google.golang.org/api/option/internaloption"
    30  	gtransport "google.golang.org/api/transport/grpc"
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/codes"
    33  )
    34  
    35  var newBigQueryReadClientHook clientHook
    36  
    37  // BigQueryReadCallOptions contains the retry settings for each method of BigQueryReadClient.
    38  type BigQueryReadCallOptions struct {
    39  	CreateReadSession []gax.CallOption
    40  	ReadRows          []gax.CallOption
    41  	SplitReadStream   []gax.CallOption
    42  }
    43  
    44  func defaultBigQueryReadGRPCClientOptions() []option.ClientOption {
    45  	return []option.ClientOption{
    46  		internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
    47  		internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
    48  		internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
    49  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
    50  		internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
    51  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
    52  		internaloption.EnableJwtWithScope(),
    53  		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
    54  			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
    55  	}
    56  }
    57  
    58  func defaultBigQueryReadCallOptions() *BigQueryReadCallOptions {
    59  	return &BigQueryReadCallOptions{
    60  		CreateReadSession: []gax.CallOption{
    61  			gax.WithTimeout(600000 * time.Millisecond),
    62  			gax.WithRetry(func() gax.Retryer {
    63  				return gax.OnCodes([]codes.Code{
    64  					codes.DeadlineExceeded,
    65  					codes.Unavailable,
    66  				}, gax.Backoff{
    67  					Initial:    100 * time.Millisecond,
    68  					Max:        60000 * time.Millisecond,
    69  					Multiplier: 1.30,
    70  				})
    71  			}),
    72  		},
    73  		ReadRows: []gax.CallOption{
    74  			gax.WithRetry(func() gax.Retryer {
    75  				return gax.OnCodes([]codes.Code{
    76  					codes.Unavailable,
    77  				}, gax.Backoff{
    78  					Initial:    100 * time.Millisecond,
    79  					Max:        60000 * time.Millisecond,
    80  					Multiplier: 1.30,
    81  				})
    82  			}),
    83  		},
    84  		SplitReadStream: []gax.CallOption{
    85  			gax.WithTimeout(600000 * time.Millisecond),
    86  			gax.WithRetry(func() gax.Retryer {
    87  				return gax.OnCodes([]codes.Code{
    88  					codes.DeadlineExceeded,
    89  					codes.Unavailable,
    90  				}, gax.Backoff{
    91  					Initial:    100 * time.Millisecond,
    92  					Max:        60000 * time.Millisecond,
    93  					Multiplier: 1.30,
    94  				})
    95  			}),
    96  		},
    97  	}
    98  }
    99  
   100  // internalBigQueryReadClient is an interface that defines the methods available from BigQuery Storage API.
   101  type internalBigQueryReadClient interface {
   102  	Close() error
   103  	setGoogleClientInfo(...string)
   104  	Connection() *grpc.ClientConn
   105  	CreateReadSession(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
   106  	ReadRows(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error)
   107  	SplitReadStream(context.Context, *storagepb.SplitReadStreamRequest, ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error)
   108  }
   109  
   110  // BigQueryReadClient is a client for interacting with BigQuery Storage API.
   111  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   112  //
   113  // BigQuery Read API.
   114  //
   115  // The Read API can be used to read data from BigQuery.
   116  type BigQueryReadClient struct {
   117  	// The internal transport-dependent client.
   118  	internalClient internalBigQueryReadClient
   119  
   120  	// The call options for this service.
   121  	CallOptions *BigQueryReadCallOptions
   122  }
   123  
   124  // Wrapper methods routed to the internal client.
   125  
   126  // Close closes the connection to the API service. The user should invoke this when
   127  // the client is no longer required.
   128  func (c *BigQueryReadClient) Close() error {
   129  	return c.internalClient.Close()
   130  }
   131  
   132  // setGoogleClientInfo sets the name and version of the application in
   133  // the `x-goog-api-client` header passed on each request. Intended for
   134  // use by Google-written clients.
   135  func (c *BigQueryReadClient) setGoogleClientInfo(keyval ...string) {
   136  	c.internalClient.setGoogleClientInfo(keyval...)
   137  }
   138  
   139  // Connection returns a connection to the API service.
   140  //
   141  // Deprecated: Connections are now pooled so this method does not always
   142  // return the same resource.
   143  func (c *BigQueryReadClient) Connection() *grpc.ClientConn {
   144  	return c.internalClient.Connection()
   145  }
   146  
   147  // CreateReadSession creates a new read session. A read session divides the contents of a
   148  // BigQuery table into one or more streams, which can then be used to read
   149  // data from the table. The read session also specifies properties of the
   150  // data to be read, such as a list of columns or a push-down filter describing
   151  // the rows to be returned.
   152  //
   153  // A particular row can be read by at most one stream. When the caller has
   154  // reached the end of each stream in the session, then all the data in the
   155  // table has been read.
   156  //
   157  // Data is assigned to each stream such that roughly the same number of
   158  // rows can be read from each stream. Because the server-side unit for
   159  // assigning data is collections of rows, the API does not guarantee that
   160  // each stream will return the same number or rows. Additionally, the
   161  // limits are enforced based on the number of pre-filtered rows, so some
   162  // filters can lead to lopsided assignments.
   163  //
   164  // Read sessions automatically expire 6 hours after they are created and do
   165  // not require manual clean-up by the caller.
   166  func (c *BigQueryReadClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   167  	return c.internalClient.CreateReadSession(ctx, req, opts...)
   168  }
   169  
   170  // ReadRows reads rows from the stream in the format prescribed by the ReadSession.
   171  // Each response contains one or more table rows, up to a maximum of 100 MiB
   172  // per response; read requests which attempt to read individual rows larger
   173  // than 100 MiB will fail.
   174  //
   175  // Each request also returns a set of stream statistics reflecting the current
   176  // state of the stream.
   177  func (c *BigQueryReadClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
   178  	return c.internalClient.ReadRows(ctx, req, opts...)
   179  }
   180  
   181  // SplitReadStream splits a given ReadStream into two ReadStream objects. These
   182  // ReadStream objects are referred to as the primary and the residual
   183  // streams of the split. The original ReadStream can still be read from in
   184  // the same manner as before. Both of the returned ReadStream objects can
   185  // also be read from, and the rows returned by both child streams will be
   186  // the same as the rows read from the original stream.
   187  //
   188  // Moreover, the two child streams will be allocated back-to-back in the
   189  // original ReadStream. Concretely, it is guaranteed that for streams
   190  // original, primary, and residual, that original[0-j] = primary[0-j] and
   191  // original[j-n] = residual[0-m] once the streams have been read to
   192  // completion.
   193  func (c *BigQueryReadClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   194  	return c.internalClient.SplitReadStream(ctx, req, opts...)
   195  }
   196  
   197  // bigQueryReadGRPCClient is a client for interacting with BigQuery Storage API over gRPC transport.
   198  //
   199  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   200  type bigQueryReadGRPCClient struct {
   201  	// Connection pool of gRPC connections to the service.
   202  	connPool gtransport.ConnPool
   203  
   204  	// Points back to the CallOptions field of the containing BigQueryReadClient
   205  	CallOptions **BigQueryReadCallOptions
   206  
   207  	// The gRPC API client.
   208  	bigQueryReadClient storagepb.BigQueryReadClient
   209  
   210  	// The x-goog-* metadata to be sent with each request.
   211  	xGoogHeaders []string
   212  }
   213  
   214  // NewBigQueryReadClient creates a new big query read client based on gRPC.
   215  // The returned client must be Closed when it is done being used to clean up its underlying connections.
   216  //
   217  // BigQuery Read API.
   218  //
   219  // The Read API can be used to read data from BigQuery.
   220  func NewBigQueryReadClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryReadClient, error) {
   221  	clientOpts := defaultBigQueryReadGRPCClientOptions()
   222  	if newBigQueryReadClientHook != nil {
   223  		hookOpts, err := newBigQueryReadClientHook(ctx, clientHookParams{})
   224  		if err != nil {
   225  			return nil, err
   226  		}
   227  		clientOpts = append(clientOpts, hookOpts...)
   228  	}
   229  
   230  	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
   231  	if err != nil {
   232  		return nil, err
   233  	}
   234  	client := BigQueryReadClient{CallOptions: defaultBigQueryReadCallOptions()}
   235  
   236  	c := &bigQueryReadGRPCClient{
   237  		connPool:           connPool,
   238  		bigQueryReadClient: storagepb.NewBigQueryReadClient(connPool),
   239  		CallOptions:        &client.CallOptions,
   240  	}
   241  	c.setGoogleClientInfo()
   242  
   243  	client.internalClient = c
   244  
   245  	return &client, nil
   246  }
   247  
   248  // Connection returns a connection to the API service.
   249  //
   250  // Deprecated: Connections are now pooled so this method does not always
   251  // return the same resource.
   252  func (c *bigQueryReadGRPCClient) Connection() *grpc.ClientConn {
   253  	return c.connPool.Conn()
   254  }
   255  
   256  // setGoogleClientInfo sets the name and version of the application in
   257  // the `x-goog-api-client` header passed on each request. Intended for
   258  // use by Google-written clients.
   259  func (c *bigQueryReadGRPCClient) setGoogleClientInfo(keyval ...string) {
   260  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   261  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
   262  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   263  }
   264  
   265  // Close closes the connection to the API service. The user should invoke this when
   266  // the client is no longer required.
   267  func (c *bigQueryReadGRPCClient) Close() error {
   268  	return c.connPool.Close()
   269  }
   270  
   271  func (c *bigQueryReadGRPCClient) CreateReadSession(ctx context.Context, req *storagepb.CreateReadSessionRequest, opts ...gax.CallOption) (*storagepb.ReadSession, error) {
   272  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_session.table", url.QueryEscape(req.GetReadSession().GetTable()))}
   273  
   274  	hds = append(c.xGoogHeaders, hds...)
   275  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   276  	opts = append((*c.CallOptions).CreateReadSession[0:len((*c.CallOptions).CreateReadSession):len((*c.CallOptions).CreateReadSession)], opts...)
   277  	var resp *storagepb.ReadSession
   278  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   279  		var err error
   280  		resp, err = c.bigQueryReadClient.CreateReadSession(ctx, req, settings.GRPC...)
   281  		return err
   282  	}, opts...)
   283  	if err != nil {
   284  		return nil, err
   285  	}
   286  	return resp, nil
   287  }
   288  
   289  func (c *bigQueryReadGRPCClient) ReadRows(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
   290  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "read_stream", url.QueryEscape(req.GetReadStream()))}
   291  
   292  	hds = append(c.xGoogHeaders, hds...)
   293  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   294  	opts = append((*c.CallOptions).ReadRows[0:len((*c.CallOptions).ReadRows):len((*c.CallOptions).ReadRows)], opts...)
   295  	var resp storagepb.BigQueryRead_ReadRowsClient
   296  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   297  		var err error
   298  		resp, err = c.bigQueryReadClient.ReadRows(ctx, req, settings.GRPC...)
   299  		return err
   300  	}, opts...)
   301  	if err != nil {
   302  		return nil, err
   303  	}
   304  	return resp, nil
   305  }
   306  
   307  func (c *bigQueryReadGRPCClient) SplitReadStream(ctx context.Context, req *storagepb.SplitReadStreamRequest, opts ...gax.CallOption) (*storagepb.SplitReadStreamResponse, error) {
   308  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
   309  
   310  	hds = append(c.xGoogHeaders, hds...)
   311  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   312  	opts = append((*c.CallOptions).SplitReadStream[0:len((*c.CallOptions).SplitReadStream):len((*c.CallOptions).SplitReadStream)], opts...)
   313  	var resp *storagepb.SplitReadStreamResponse
   314  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   315  		var err error
   316  		resp, err = c.bigQueryReadClient.SplitReadStream(ctx, req, settings.GRPC...)
   317  		return err
   318  	}, opts...)
   319  	if err != nil {
   320  		return nil, err
   321  	}
   322  	return resp, nil
   323  }
   324  

View as plain text