...

Source file src/cloud.google.com/go/bigquery/storage/apiv1/big_query_write_client.go

Documentation: cloud.google.com/go/bigquery/storage/apiv1

     1  // Copyright 2024 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Code generated by protoc-gen-go_gapic. DO NOT EDIT.
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"net/url"
    24  	"time"
    25  
    26  	storagepb "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    27  	gax "github.com/googleapis/gax-go/v2"
    28  	"google.golang.org/api/option"
    29  	"google.golang.org/api/option/internaloption"
    30  	gtransport "google.golang.org/api/transport/grpc"
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/codes"
    33  )
    34  
    35  var newBigQueryWriteClientHook clientHook
    36  
    37  // BigQueryWriteCallOptions contains the retry settings for each method of BigQueryWriteClient.
    38  type BigQueryWriteCallOptions struct {
    39  	CreateWriteStream       []gax.CallOption
    40  	AppendRows              []gax.CallOption
    41  	GetWriteStream          []gax.CallOption
    42  	FinalizeWriteStream     []gax.CallOption
    43  	BatchCommitWriteStreams []gax.CallOption
    44  	FlushRows               []gax.CallOption
    45  }
    46  
    47  func defaultBigQueryWriteGRPCClientOptions() []option.ClientOption {
    48  	return []option.ClientOption{
    49  		internaloption.WithDefaultEndpoint("bigquerystorage.googleapis.com:443"),
    50  		internaloption.WithDefaultEndpointTemplate("bigquerystorage.UNIVERSE_DOMAIN:443"),
    51  		internaloption.WithDefaultMTLSEndpoint("bigquerystorage.mtls.googleapis.com:443"),
    52  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
    53  		internaloption.WithDefaultAudience("https://bigquerystorage.googleapis.com/"),
    54  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
    55  		internaloption.EnableJwtWithScope(),
    56  		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
    57  			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
    58  	}
    59  }
    60  
    61  func defaultBigQueryWriteCallOptions() *BigQueryWriteCallOptions {
    62  	return &BigQueryWriteCallOptions{
    63  		CreateWriteStream: []gax.CallOption{
    64  			gax.WithTimeout(1200000 * time.Millisecond),
    65  			gax.WithRetry(func() gax.Retryer {
    66  				return gax.OnCodes([]codes.Code{
    67  					codes.DeadlineExceeded,
    68  					codes.Unavailable,
    69  					codes.ResourceExhausted,
    70  				}, gax.Backoff{
    71  					Initial:    10000 * time.Millisecond,
    72  					Max:        120000 * time.Millisecond,
    73  					Multiplier: 1.30,
    74  				})
    75  			}),
    76  		},
    77  		AppendRows: []gax.CallOption{
    78  			gax.WithRetry(func() gax.Retryer {
    79  				return gax.OnCodes([]codes.Code{
    80  					codes.Unavailable,
    81  				}, gax.Backoff{
    82  					Initial:    100 * time.Millisecond,
    83  					Max:        60000 * time.Millisecond,
    84  					Multiplier: 1.30,
    85  				})
    86  			}),
    87  		},
    88  		GetWriteStream: []gax.CallOption{
    89  			gax.WithTimeout(600000 * time.Millisecond),
    90  			gax.WithRetry(func() gax.Retryer {
    91  				return gax.OnCodes([]codes.Code{
    92  					codes.DeadlineExceeded,
    93  					codes.Unavailable,
    94  					codes.ResourceExhausted,
    95  				}, gax.Backoff{
    96  					Initial:    100 * time.Millisecond,
    97  					Max:        60000 * time.Millisecond,
    98  					Multiplier: 1.30,
    99  				})
   100  			}),
   101  		},
   102  		FinalizeWriteStream: []gax.CallOption{
   103  			gax.WithTimeout(600000 * time.Millisecond),
   104  			gax.WithRetry(func() gax.Retryer {
   105  				return gax.OnCodes([]codes.Code{
   106  					codes.DeadlineExceeded,
   107  					codes.Unavailable,
   108  					codes.ResourceExhausted,
   109  				}, gax.Backoff{
   110  					Initial:    100 * time.Millisecond,
   111  					Max:        60000 * time.Millisecond,
   112  					Multiplier: 1.30,
   113  				})
   114  			}),
   115  		},
   116  		BatchCommitWriteStreams: []gax.CallOption{
   117  			gax.WithTimeout(600000 * time.Millisecond),
   118  			gax.WithRetry(func() gax.Retryer {
   119  				return gax.OnCodes([]codes.Code{
   120  					codes.DeadlineExceeded,
   121  					codes.Unavailable,
   122  					codes.ResourceExhausted,
   123  				}, gax.Backoff{
   124  					Initial:    100 * time.Millisecond,
   125  					Max:        60000 * time.Millisecond,
   126  					Multiplier: 1.30,
   127  				})
   128  			}),
   129  		},
   130  		FlushRows: []gax.CallOption{
   131  			gax.WithTimeout(600000 * time.Millisecond),
   132  			gax.WithRetry(func() gax.Retryer {
   133  				return gax.OnCodes([]codes.Code{
   134  					codes.DeadlineExceeded,
   135  					codes.Unavailable,
   136  					codes.ResourceExhausted,
   137  				}, gax.Backoff{
   138  					Initial:    100 * time.Millisecond,
   139  					Max:        60000 * time.Millisecond,
   140  					Multiplier: 1.30,
   141  				})
   142  			}),
   143  		},
   144  	}
   145  }
   146  
   147  // internalBigQueryWriteClient is an interface that defines the methods available from BigQuery Storage API.
   148  type internalBigQueryWriteClient interface {
   149  	Close() error
   150  	setGoogleClientInfo(...string)
   151  	Connection() *grpc.ClientConn
   152  	CreateWriteStream(context.Context, *storagepb.CreateWriteStreamRequest, ...gax.CallOption) (*storagepb.WriteStream, error)
   153  	AppendRows(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
   154  	GetWriteStream(context.Context, *storagepb.GetWriteStreamRequest, ...gax.CallOption) (*storagepb.WriteStream, error)
   155  	FinalizeWriteStream(context.Context, *storagepb.FinalizeWriteStreamRequest, ...gax.CallOption) (*storagepb.FinalizeWriteStreamResponse, error)
   156  	BatchCommitWriteStreams(context.Context, *storagepb.BatchCommitWriteStreamsRequest, ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error)
   157  	FlushRows(context.Context, *storagepb.FlushRowsRequest, ...gax.CallOption) (*storagepb.FlushRowsResponse, error)
   158  }
   159  
   160  // BigQueryWriteClient is a client for interacting with BigQuery Storage API.
   161  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   162  //
   163  // BigQuery Write API.
   164  //
   165  // The Write API can be used to write data to BigQuery.
   166  //
   167  // For supplementary information about the Write API, see:
   168  // https://cloud.google.com/bigquery/docs/write-api (at https://cloud.google.com/bigquery/docs/write-api)
   169  type BigQueryWriteClient struct {
   170  	// The internal transport-dependent client.
   171  	internalClient internalBigQueryWriteClient
   172  
   173  	// The call options for this service.
   174  	CallOptions *BigQueryWriteCallOptions
   175  }
   176  
   177  // Wrapper methods routed to the internal client.
   178  
   179  // Close closes the connection to the API service. The user should invoke this when
   180  // the client is no longer required.
   181  func (c *BigQueryWriteClient) Close() error {
   182  	return c.internalClient.Close()
   183  }
   184  
   185  // setGoogleClientInfo sets the name and version of the application in
   186  // the `x-goog-api-client` header passed on each request. Intended for
   187  // use by Google-written clients.
   188  func (c *BigQueryWriteClient) setGoogleClientInfo(keyval ...string) {
   189  	c.internalClient.setGoogleClientInfo(keyval...)
   190  }
   191  
   192  // Connection returns a connection to the API service.
   193  //
   194  // Deprecated: Connections are now pooled so this method does not always
   195  // return the same resource.
   196  func (c *BigQueryWriteClient) Connection() *grpc.ClientConn {
   197  	return c.internalClient.Connection()
   198  }
   199  
   200  // CreateWriteStream creates a write stream to the given table.
   201  // Additionally, every table has a special stream named ‘_default’
   202  // to which data can be written. This stream doesn’t need to be created using
   203  // CreateWriteStream. It is a stream that can be used simultaneously by any
   204  // number of clients. Data written to this stream is considered committed as
   205  // soon as an acknowledgement is received.
   206  func (c *BigQueryWriteClient) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
   207  	return c.internalClient.CreateWriteStream(ctx, req, opts...)
   208  }
   209  
   210  // AppendRows appends data to the given stream.
   211  //
   212  // If offset is specified, the offset is checked against the end of
   213  // stream. The server returns OUT_OF_RANGE in AppendRowsResponse if an
   214  // attempt is made to append to an offset beyond the current end of the stream
   215  // or ALREADY_EXISTS if user provides an offset that has already been
   216  // written to. User can retry with adjusted offset within the same RPC
   217  // connection. If offset is not specified, append happens at the end of the
   218  // stream.
   219  //
   220  // The response contains an optional offset at which the append
   221  // happened.  No offset information will be returned for appends to a
   222  // default stream.
   223  //
   224  // Responses are received in the same order in which requests are sent.
   225  // There will be one response for each successful inserted request.  Responses
   226  // may optionally embed error information if the originating AppendRequest was
   227  // not successfully processed.
   228  //
   229  // The specifics of when successfully appended data is made visible to the
   230  // table are governed by the type of stream:
   231  //
   232  //	For COMMITTED streams (which includes the default stream), data is
   233  //	visible immediately upon successful append.
   234  //
   235  //	For BUFFERED streams, data is made visible via a subsequent FlushRows
   236  //	rpc which advances a cursor to a newer offset in the stream.
   237  //
   238  //	For PENDING streams, data is not made visible until the stream itself is
   239  //	finalized (via the FinalizeWriteStream rpc), and the stream is explicitly
   240  //	committed via the BatchCommitWriteStreams rpc.
   241  func (c *BigQueryWriteClient) AppendRows(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   242  	return c.internalClient.AppendRows(ctx, opts...)
   243  }
   244  
   245  // GetWriteStream gets information about a write stream.
   246  func (c *BigQueryWriteClient) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
   247  	return c.internalClient.GetWriteStream(ctx, req, opts...)
   248  }
   249  
   250  // FinalizeWriteStream finalize a write stream so that no new data can be appended to the
   251  // stream. Finalize is not supported on the ‘_default’ stream.
   252  func (c *BigQueryWriteClient) FinalizeWriteStream(ctx context.Context, req *storagepb.FinalizeWriteStreamRequest, opts ...gax.CallOption) (*storagepb.FinalizeWriteStreamResponse, error) {
   253  	return c.internalClient.FinalizeWriteStream(ctx, req, opts...)
   254  }
   255  
   256  // BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same
   257  // parent table.
   258  //
   259  // Streams must be finalized before commit and cannot be committed multiple
   260  // times. Once a stream is committed, data in the stream becomes available
   261  // for read operations.
   262  func (c *BigQueryWriteClient) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
   263  	return c.internalClient.BatchCommitWriteStreams(ctx, req, opts...)
   264  }
   265  
   266  // FlushRows flushes rows to a BUFFERED stream.
   267  //
   268  // If users are appending rows to BUFFERED stream, flush operation is
   269  // required in order for the rows to become available for reading. A
   270  // Flush operation flushes up to any previously flushed offset in a BUFFERED
   271  // stream, to the offset specified in the request.
   272  //
   273  // Flush is not supported on the _default stream, since it is not BUFFERED.
   274  func (c *BigQueryWriteClient) FlushRows(ctx context.Context, req *storagepb.FlushRowsRequest, opts ...gax.CallOption) (*storagepb.FlushRowsResponse, error) {
   275  	return c.internalClient.FlushRows(ctx, req, opts...)
   276  }
   277  
   278  // bigQueryWriteGRPCClient is a client for interacting with BigQuery Storage API over gRPC transport.
   279  //
   280  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   281  type bigQueryWriteGRPCClient struct {
   282  	// Connection pool of gRPC connections to the service.
   283  	connPool gtransport.ConnPool
   284  
   285  	// Points back to the CallOptions field of the containing BigQueryWriteClient
   286  	CallOptions **BigQueryWriteCallOptions
   287  
   288  	// The gRPC API client.
   289  	bigQueryWriteClient storagepb.BigQueryWriteClient
   290  
   291  	// The x-goog-* metadata to be sent with each request.
   292  	xGoogHeaders []string
   293  }
   294  
   295  // NewBigQueryWriteClient creates a new big query write client based on gRPC.
   296  // The returned client must be Closed when it is done being used to clean up its underlying connections.
   297  //
   298  // BigQuery Write API.
   299  //
   300  // The Write API can be used to write data to BigQuery.
   301  //
   302  // For supplementary information about the Write API, see:
   303  // https://cloud.google.com/bigquery/docs/write-api (at https://cloud.google.com/bigquery/docs/write-api)
   304  func NewBigQueryWriteClient(ctx context.Context, opts ...option.ClientOption) (*BigQueryWriteClient, error) {
   305  	clientOpts := defaultBigQueryWriteGRPCClientOptions()
   306  	if newBigQueryWriteClientHook != nil {
   307  		hookOpts, err := newBigQueryWriteClientHook(ctx, clientHookParams{})
   308  		if err != nil {
   309  			return nil, err
   310  		}
   311  		clientOpts = append(clientOpts, hookOpts...)
   312  	}
   313  
   314  	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
   315  	if err != nil {
   316  		return nil, err
   317  	}
   318  	client := BigQueryWriteClient{CallOptions: defaultBigQueryWriteCallOptions()}
   319  
   320  	c := &bigQueryWriteGRPCClient{
   321  		connPool:            connPool,
   322  		bigQueryWriteClient: storagepb.NewBigQueryWriteClient(connPool),
   323  		CallOptions:         &client.CallOptions,
   324  	}
   325  	c.setGoogleClientInfo()
   326  
   327  	client.internalClient = c
   328  
   329  	return &client, nil
   330  }
   331  
   332  // Connection returns a connection to the API service.
   333  //
   334  // Deprecated: Connections are now pooled so this method does not always
   335  // return the same resource.
   336  func (c *bigQueryWriteGRPCClient) Connection() *grpc.ClientConn {
   337  	return c.connPool.Conn()
   338  }
   339  
   340  // setGoogleClientInfo sets the name and version of the application in
   341  // the `x-goog-api-client` header passed on each request. Intended for
   342  // use by Google-written clients.
   343  func (c *bigQueryWriteGRPCClient) setGoogleClientInfo(keyval ...string) {
   344  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   345  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
   346  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   347  }
   348  
   349  // Close closes the connection to the API service. The user should invoke this when
   350  // the client is no longer required.
   351  func (c *bigQueryWriteGRPCClient) Close() error {
   352  	return c.connPool.Close()
   353  }
   354  
   355  func (c *bigQueryWriteGRPCClient) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
   356  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))}
   357  
   358  	hds = append(c.xGoogHeaders, hds...)
   359  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   360  	opts = append((*c.CallOptions).CreateWriteStream[0:len((*c.CallOptions).CreateWriteStream):len((*c.CallOptions).CreateWriteStream)], opts...)
   361  	var resp *storagepb.WriteStream
   362  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   363  		var err error
   364  		resp, err = c.bigQueryWriteClient.CreateWriteStream(ctx, req, settings.GRPC...)
   365  		return err
   366  	}, opts...)
   367  	if err != nil {
   368  		return nil, err
   369  	}
   370  	return resp, nil
   371  }
   372  
   373  func (c *bigQueryWriteGRPCClient) AppendRows(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   374  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, c.xGoogHeaders...)
   375  	var resp storagepb.BigQueryWrite_AppendRowsClient
   376  	opts = append((*c.CallOptions).AppendRows[0:len((*c.CallOptions).AppendRows):len((*c.CallOptions).AppendRows)], opts...)
   377  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   378  		var err error
   379  		resp, err = c.bigQueryWriteClient.AppendRows(ctx, settings.GRPC...)
   380  		return err
   381  	}, opts...)
   382  	if err != nil {
   383  		return nil, err
   384  	}
   385  	return resp, nil
   386  }
   387  
   388  func (c *bigQueryWriteGRPCClient) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
   389  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
   390  
   391  	hds = append(c.xGoogHeaders, hds...)
   392  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   393  	opts = append((*c.CallOptions).GetWriteStream[0:len((*c.CallOptions).GetWriteStream):len((*c.CallOptions).GetWriteStream)], opts...)
   394  	var resp *storagepb.WriteStream
   395  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   396  		var err error
   397  		resp, err = c.bigQueryWriteClient.GetWriteStream(ctx, req, settings.GRPC...)
   398  		return err
   399  	}, opts...)
   400  	if err != nil {
   401  		return nil, err
   402  	}
   403  	return resp, nil
   404  }
   405  
   406  func (c *bigQueryWriteGRPCClient) FinalizeWriteStream(ctx context.Context, req *storagepb.FinalizeWriteStreamRequest, opts ...gax.CallOption) (*storagepb.FinalizeWriteStreamResponse, error) {
   407  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
   408  
   409  	hds = append(c.xGoogHeaders, hds...)
   410  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   411  	opts = append((*c.CallOptions).FinalizeWriteStream[0:len((*c.CallOptions).FinalizeWriteStream):len((*c.CallOptions).FinalizeWriteStream)], opts...)
   412  	var resp *storagepb.FinalizeWriteStreamResponse
   413  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   414  		var err error
   415  		resp, err = c.bigQueryWriteClient.FinalizeWriteStream(ctx, req, settings.GRPC...)
   416  		return err
   417  	}, opts...)
   418  	if err != nil {
   419  		return nil, err
   420  	}
   421  	return resp, nil
   422  }
   423  
   424  func (c *bigQueryWriteGRPCClient) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
   425  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))}
   426  
   427  	hds = append(c.xGoogHeaders, hds...)
   428  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   429  	opts = append((*c.CallOptions).BatchCommitWriteStreams[0:len((*c.CallOptions).BatchCommitWriteStreams):len((*c.CallOptions).BatchCommitWriteStreams)], opts...)
   430  	var resp *storagepb.BatchCommitWriteStreamsResponse
   431  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   432  		var err error
   433  		resp, err = c.bigQueryWriteClient.BatchCommitWriteStreams(ctx, req, settings.GRPC...)
   434  		return err
   435  	}, opts...)
   436  	if err != nil {
   437  		return nil, err
   438  	}
   439  	return resp, nil
   440  }
   441  
   442  func (c *bigQueryWriteGRPCClient) FlushRows(ctx context.Context, req *storagepb.FlushRowsRequest, opts ...gax.CallOption) (*storagepb.FlushRowsResponse, error) {
   443  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "write_stream", url.QueryEscape(req.GetWriteStream()))}
   444  
   445  	hds = append(c.xGoogHeaders, hds...)
   446  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   447  	opts = append((*c.CallOptions).FlushRows[0:len((*c.CallOptions).FlushRows):len((*c.CallOptions).FlushRows)], opts...)
   448  	var resp *storagepb.FlushRowsResponse
   449  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   450  		var err error
   451  		resp, err = c.bigQueryWriteClient.FlushRows(ctx, req, settings.GRPC...)
   452  		return err
   453  	}, opts...)
   454  	if err != nil {
   455  		return nil, err
   456  	}
   457  	return resp, nil
   458  }
   459  

View as plain text