...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/options.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    19  	"github.com/googleapis/gax-go/v2"
    20  	"google.golang.org/api/option"
    21  	"google.golang.org/api/option/internaloption"
    22  	"google.golang.org/protobuf/types/descriptorpb"
    23  	"google.golang.org/protobuf/types/known/wrapperspb"
    24  )
    25  
    26  // encapsulates custom client-level config settings.
    27  type writerClientConfig struct {
    28  	useMultiplex                 bool
    29  	maxMultiplexPoolSize         int
    30  	defaultInflightRequests      int
    31  	defaultInflightBytes         int
    32  	defaultAppendRowsCallOptions []gax.CallOption
    33  }
    34  
    35  // newWriterClientConfig builds a client config based on package-specific custom ClientOptions.
    36  func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig {
    37  	conf := &writerClientConfig{}
    38  	for _, opt := range opts {
    39  		if wOpt, ok := opt.(writerClientOption); ok {
    40  			wOpt.ApplyWriterOpt(conf)
    41  		}
    42  	}
    43  
    44  	// Normalize the config to ensure we're dealing with sane values.
    45  	if conf.useMultiplex {
    46  		if conf.maxMultiplexPoolSize < 1 {
    47  			conf.maxMultiplexPoolSize = 1
    48  		}
    49  	}
    50  	if conf.defaultInflightBytes < 0 {
    51  		conf.defaultInflightBytes = 0
    52  	}
    53  	if conf.defaultInflightRequests < 0 {
    54  		conf.defaultInflightRequests = 0
    55  	}
    56  	return conf
    57  }
    58  
    59  // writerClientOption allows us to extend ClientOptions for client-specific needs.
    60  type writerClientOption interface {
    61  	option.ClientOption
    62  	ApplyWriterOpt(*writerClientConfig)
    63  }
    64  
    65  // WithMultiplexing is an EXPERIMENTAL option that controls connection sharing
    66  // when instantiating the Client.  Only writes to default streams can leverage the
    67  // multiplex pool.  Internally, the client maintains a pool of connections per BigQuery
    68  // destination region, and will grow the pool to it's maximum allowed size if there's
    69  // sufficient traffic on the shared connection(s).
    70  //
    71  // This ClientOption is EXPERIMENTAL and subject to change.
    72  func WithMultiplexing() option.ClientOption {
    73  	return &enableMultiplexSetting{useMultiplex: true}
    74  }
    75  
    76  type enableMultiplexSetting struct {
    77  	internaloption.EmbeddableAdapter
    78  	useMultiplex bool
    79  }
    80  
    81  func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) {
    82  	c.useMultiplex = s.useMultiplex
    83  }
    84  
    85  // WithMultiplexPoolLimit is an EXPERIMENTAL option that sets the maximum
    86  // shared multiplex pool size when instantiating the Client.  If multiplexing
    87  // is not enabled, this setting is ignored.  By default, the limit is a single
    88  // shared connection.  This limit is applied per destination region.
    89  //
    90  // This ClientOption is EXPERIMENTAL and subject to change.
    91  func WithMultiplexPoolLimit(maxSize int) option.ClientOption {
    92  	return &maxMultiplexPoolSizeSetting{maxSize: maxSize}
    93  }
    94  
    95  type maxMultiplexPoolSizeSetting struct {
    96  	internaloption.EmbeddableAdapter
    97  	maxSize int
    98  }
    99  
   100  func (s *maxMultiplexPoolSizeSetting) ApplyWriterOpt(c *writerClientConfig) {
   101  	c.maxMultiplexPoolSize = s.maxSize
   102  }
   103  
   104  // WithDefaultInflightRequests is an EXPERIMENTAL ClientOption for controlling
   105  // the default limit of how many individual AppendRows write requests can
   106  // be in flight on a connection at a time.  This limit is enforced on all connections
   107  // created by the instantiated Client.
   108  //
   109  // Note: the WithMaxInflightRequests WriterOption can still be used to control
   110  // the behavior for individual ManagedStream writers when not using multiplexing.
   111  //
   112  // This ClientOption is EXPERIMENTAL and subject to change.
   113  func WithDefaultInflightRequests(n int) option.ClientOption {
   114  	return &defaultInflightRequestsSetting{maxRequests: n}
   115  }
   116  
   117  type defaultInflightRequestsSetting struct {
   118  	internaloption.EmbeddableAdapter
   119  	maxRequests int
   120  }
   121  
   122  func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) {
   123  	c.defaultInflightRequests = s.maxRequests
   124  }
   125  
   126  // WithDefaultInflightBytes is an EXPERIMENTAL ClientOption for controlling
   127  // the default byte limit for how many individual AppendRows write requests can
   128  // be in flight on a connection at a time.  This limit is enforced on all connections
   129  // created by the instantiated Client.
   130  //
   131  // Note: the WithMaxInflightBytes WriterOption can still be used to control
   132  // the behavior for individual ManagedStream writers when not using multiplexing.
   133  //
   134  // This ClientOption is EXPERIMENTAL and subject to change.
   135  func WithDefaultInflightBytes(n int) option.ClientOption {
   136  	return &defaultInflightBytesSetting{maxBytes: n}
   137  }
   138  
   139  type defaultInflightBytesSetting struct {
   140  	internaloption.EmbeddableAdapter
   141  	maxBytes int
   142  }
   143  
   144  func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) {
   145  	c.defaultInflightBytes = s.maxBytes
   146  }
   147  
   148  // WithDefaultAppendRowsCallOption is an EXPERIMENTAL ClientOption for controlling
   149  // the gax.CallOptions passed when opening the underlying AppendRows bidi
   150  // stream connections used by this library to communicate with the BigQuery
   151  // Storage service.  This option is propagated to all
   152  // connections created by the instantiated Client.
   153  //
   154  // Note: the WithAppendRowsCallOption WriterOption can still be used to control
   155  // the behavior for individual ManagedStream writers that don't participate
   156  // in multiplexing.
   157  //
   158  // This ClientOption is EXPERIMENTAL and subject to change.
   159  func WithDefaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
   160  	return &defaultAppendRowsCallOptionSetting{opt: o}
   161  }
   162  
   163  type defaultAppendRowsCallOptionSetting struct {
   164  	internaloption.EmbeddableAdapter
   165  	opt gax.CallOption
   166  }
   167  
   168  func (s *defaultAppendRowsCallOptionSetting) ApplyWriterOpt(c *writerClientConfig) {
   169  	c.defaultAppendRowsCallOptions = append(c.defaultAppendRowsCallOptions, s.opt)
   170  }
   171  
   172  // WriterOption are variadic options used to configure a ManagedStream instance.
   173  type WriterOption func(*ManagedStream)
   174  
   175  // WithType sets the stream type for the managed stream.
   176  func WithType(st StreamType) WriterOption {
   177  	return func(ms *ManagedStream) {
   178  		ms.streamSettings.streamType = st
   179  	}
   180  }
   181  
   182  // WithStreamName allows users to set the stream name this writer will
   183  // append to explicitly.  By default, the managed client will create the
   184  // stream when instantiated if necessary.
   185  //
   186  // Note:  Supplying this option causes other options which affect stream construction
   187  // such as WithStreamType and WithDestinationTable to be ignored.
   188  func WithStreamName(name string) WriterOption {
   189  	return func(ms *ManagedStream) {
   190  		ms.streamSettings.streamID = name
   191  	}
   192  }
   193  
   194  // WithDestinationTable specifies the destination table to which a created
   195  // stream will append rows.  Format of the table:
   196  //
   197  //	projects/{projectid}/datasets/{dataset}/tables/{table}
   198  func WithDestinationTable(destTable string) WriterOption {
   199  	return func(ms *ManagedStream) {
   200  		ms.streamSettings.destinationTable = destTable
   201  	}
   202  }
   203  
   204  // WithMaxInflightRequests bounds the inflight appends on the write connection.
   205  //
   206  // Note: See the WithDefaultInflightRequests ClientOption for setting a default
   207  // when instantiating a client, rather than setting this limit per-writer.
   208  // This WriterOption is ignored for ManagedStreams that participate in multiplexing.
   209  func WithMaxInflightRequests(n int) WriterOption {
   210  	return func(ms *ManagedStream) {
   211  		ms.streamSettings.MaxInflightRequests = n
   212  	}
   213  }
   214  
   215  // WithMaxInflightBytes bounds the inflight append request bytes on the write connection.
   216  //
   217  // Note: See the WithDefaultInflightBytes ClientOption for setting a default
   218  // when instantiating a client, rather than setting this limit per-writer.
   219  // This WriterOption is ignored for ManagedStreams that participate in multiplexing.
   220  func WithMaxInflightBytes(n int) WriterOption {
   221  	return func(ms *ManagedStream) {
   222  		ms.streamSettings.MaxInflightBytes = n
   223  	}
   224  }
   225  
   226  // WithTraceID allows instruments requests to the service with a custom trace prefix.
   227  // This is generally for diagnostic purposes only.
   228  func WithTraceID(traceID string) WriterOption {
   229  	return func(ms *ManagedStream) {
   230  		ms.streamSettings.TraceID = traceID
   231  	}
   232  }
   233  
   234  // WithSchemaDescriptor describes the format of the serialized data being sent by
   235  // AppendRows calls on the stream.
   236  func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption {
   237  	return func(ms *ManagedStream) {
   238  		ms.curTemplate = ms.curTemplate.revise(reviseProtoSchema(dp))
   239  	}
   240  }
   241  
   242  // WithMissingValueInterpretations controls how missing values are interpreted
   243  // for individual columns.
   244  //
   245  // You must provide a map to indicate how to interpret missing value for some fields. Missing
   246  // values are fields present in user schema but missing in rows. The key is
   247  // the field name. The value is the interpretation of missing values for the
   248  // field.
   249  //
   250  // For example, the following option would indicate that missing values in the "foo"
   251  // column are interpreted as null, whereas missing values in the "bar" column are
   252  // treated as the default value:
   253  //
   254  //	   WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
   255  //					"foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
   256  //					"bar": storagepb.AppendRowsRequest_NULL_VALUE,
   257  //		  })
   258  //
   259  // If a field is not in this map and has missing values, the missing values
   260  // in this field are interpreted as NULL unless overridden with a default missing
   261  // value interpretation.
   262  //
   263  // Currently, field name can only be top-level column name, can't be a struct
   264  // field path like 'foo.bar'.
   265  func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption {
   266  	return func(ms *ManagedStream) {
   267  		ms.curTemplate = ms.curTemplate.revise(reviseMissingValueInterpretations(mvi))
   268  	}
   269  }
   270  
   271  // WithDefaultMissingValueInterpretation controls how missing values are interpreted by
   272  // for a given stream.  See WithMissingValueIntepretations for more information about
   273  // missing values.
   274  //
   275  // WithMissingValueIntepretations set for individual colums can override the default chosen
   276  // with this option.
   277  //
   278  // For example, if you want to write
   279  // `NULL` instead of using default values for some columns, you can set
   280  // `default_missing_value_interpretation` to `DEFAULT_VALUE` and at the same
   281  // time, set `missing_value_interpretations` to `NULL_VALUE` on those columns.
   282  func WithDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption {
   283  	return func(ms *ManagedStream) {
   284  		ms.curTemplate = ms.curTemplate.revise(reviseDefaultMissingValueInterpretation(def))
   285  	}
   286  }
   287  
   288  // WithDataOrigin is used to attach an origin context to the instrumentation metrics
   289  // emitted by the library.
   290  func WithDataOrigin(dataOrigin string) WriterOption {
   291  	return func(ms *ManagedStream) {
   292  		ms.streamSettings.dataOrigin = dataOrigin
   293  	}
   294  }
   295  
   296  // WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when
   297  // it opens the underlying append stream.
   298  //
   299  // Note: See the DefaultAppendRowsCallOption ClientOption for setting defaults
   300  // when instantiating a client, rather than setting this limit per-writer.  This WriterOption
   301  // is ignored for ManagedStream writers that participate in multiplexing.
   302  func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
   303  	return func(ms *ManagedStream) {
   304  		ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, o)
   305  	}
   306  }
   307  
   308  // EnableWriteRetries enables ManagedStream to automatically retry failed appends.
   309  //
   310  // Enabling retries is best suited for cases where users want to achieve at-least-once
   311  // append semantics.  Use of automatic retries may complicate patterns where the user
   312  // is designing for exactly-once append semantics.
   313  func EnableWriteRetries(enable bool) WriterOption {
   314  	return func(ms *ManagedStream) {
   315  		if enable {
   316  			ms.retry = newStatelessRetryer()
   317  		}
   318  	}
   319  }
   320  
   321  // AppendOption are options that can be passed when appending data with a managed stream instance.
   322  type AppendOption func(*pendingWrite)
   323  
   324  // UpdateSchemaDescriptor is used to update the descriptor message schema associated
   325  // with a given stream.
   326  func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption {
   327  	return func(pw *pendingWrite) {
   328  		pw.reqTmpl = pw.reqTmpl.revise(reviseProtoSchema(schema))
   329  	}
   330  }
   331  
   332  // UpdateMissingValueInterpretations updates the per-column missing-value intepretations settings,
   333  // and is retained for subsequent writes.  See the WithMissingValueInterpretations WriterOption for
   334  // more details.
   335  func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption {
   336  	return func(pw *pendingWrite) {
   337  		pw.reqTmpl = pw.reqTmpl.revise(reviseMissingValueInterpretations(mvi))
   338  	}
   339  }
   340  
   341  // UpdateDefaultMissingValueInterpretation updates the default intepretations setting for the stream,
   342  // and is retained for subsequent writes.  See the WithDefaultMissingValueInterpretations WriterOption for
   343  // more details.
   344  func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption {
   345  	return func(pw *pendingWrite) {
   346  		pw.reqTmpl = pw.reqTmpl.revise(reviseDefaultMissingValueInterpretation(def))
   347  	}
   348  }
   349  
   350  // WithOffset sets an explicit offset value for this append request.
   351  func WithOffset(offset int64) AppendOption {
   352  	return func(pw *pendingWrite) {
   353  		pw.req.Offset = &wrapperspb.Int64Value{
   354  			Value: offset,
   355  		}
   356  	}
   357  }
   358  

View as plain text