// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" ) // encapsulates custom client-level config settings. type writerClientConfig struct { useMultiplex bool maxMultiplexPoolSize int defaultInflightRequests int defaultInflightBytes int defaultAppendRowsCallOptions []gax.CallOption } // newWriterClientConfig builds a client config based on package-specific custom ClientOptions. func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig { conf := &writerClientConfig{} for _, opt := range opts { if wOpt, ok := opt.(writerClientOption); ok { wOpt.ApplyWriterOpt(conf) } } // Normalize the config to ensure we're dealing with sane values. if conf.useMultiplex { if conf.maxMultiplexPoolSize < 1 { conf.maxMultiplexPoolSize = 1 } } if conf.defaultInflightBytes < 0 { conf.defaultInflightBytes = 0 } if conf.defaultInflightRequests < 0 { conf.defaultInflightRequests = 0 } return conf } // writerClientOption allows us to extend ClientOptions for client-specific needs. type writerClientOption interface { option.ClientOption ApplyWriterOpt(*writerClientConfig) } // WithMultiplexing is an EXPERIMENTAL option that controls connection sharing // when instantiating the Client. Only writes to default streams can leverage the // multiplex pool. Internally, the client maintains a pool of connections per BigQuery // destination region, and will grow the pool to it's maximum allowed size if there's // sufficient traffic on the shared connection(s). // // This ClientOption is EXPERIMENTAL and subject to change. func WithMultiplexing() option.ClientOption { return &enableMultiplexSetting{useMultiplex: true} } type enableMultiplexSetting struct { internaloption.EmbeddableAdapter useMultiplex bool } func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) { c.useMultiplex = s.useMultiplex } // WithMultiplexPoolLimit is an EXPERIMENTAL option that sets the maximum // shared multiplex pool size when instantiating the Client. If multiplexing // is not enabled, this setting is ignored. By default, the limit is a single // shared connection. This limit is applied per destination region. // // This ClientOption is EXPERIMENTAL and subject to change. func WithMultiplexPoolLimit(maxSize int) option.ClientOption { return &maxMultiplexPoolSizeSetting{maxSize: maxSize} } type maxMultiplexPoolSizeSetting struct { internaloption.EmbeddableAdapter maxSize int } func (s *maxMultiplexPoolSizeSetting) ApplyWriterOpt(c *writerClientConfig) { c.maxMultiplexPoolSize = s.maxSize } // WithDefaultInflightRequests is an EXPERIMENTAL ClientOption for controlling // the default limit of how many individual AppendRows write requests can // be in flight on a connection at a time. This limit is enforced on all connections // created by the instantiated Client. // // Note: the WithMaxInflightRequests WriterOption can still be used to control // the behavior for individual ManagedStream writers when not using multiplexing. // // This ClientOption is EXPERIMENTAL and subject to change. func WithDefaultInflightRequests(n int) option.ClientOption { return &defaultInflightRequestsSetting{maxRequests: n} } type defaultInflightRequestsSetting struct { internaloption.EmbeddableAdapter maxRequests int } func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) { c.defaultInflightRequests = s.maxRequests } // WithDefaultInflightBytes is an EXPERIMENTAL ClientOption for controlling // the default byte limit for how many individual AppendRows write requests can // be in flight on a connection at a time. This limit is enforced on all connections // created by the instantiated Client. // // Note: the WithMaxInflightBytes WriterOption can still be used to control // the behavior for individual ManagedStream writers when not using multiplexing. // // This ClientOption is EXPERIMENTAL and subject to change. func WithDefaultInflightBytes(n int) option.ClientOption { return &defaultInflightBytesSetting{maxBytes: n} } type defaultInflightBytesSetting struct { internaloption.EmbeddableAdapter maxBytes int } func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) { c.defaultInflightBytes = s.maxBytes } // WithDefaultAppendRowsCallOption is an EXPERIMENTAL ClientOption for controlling // the gax.CallOptions passed when opening the underlying AppendRows bidi // stream connections used by this library to communicate with the BigQuery // Storage service. This option is propagated to all // connections created by the instantiated Client. // // Note: the WithAppendRowsCallOption WriterOption can still be used to control // the behavior for individual ManagedStream writers that don't participate // in multiplexing. // // This ClientOption is EXPERIMENTAL and subject to change. func WithDefaultAppendRowsCallOption(o gax.CallOption) option.ClientOption { return &defaultAppendRowsCallOptionSetting{opt: o} } type defaultAppendRowsCallOptionSetting struct { internaloption.EmbeddableAdapter opt gax.CallOption } func (s *defaultAppendRowsCallOptionSetting) ApplyWriterOpt(c *writerClientConfig) { c.defaultAppendRowsCallOptions = append(c.defaultAppendRowsCallOptions, s.opt) } // WriterOption are variadic options used to configure a ManagedStream instance. type WriterOption func(*ManagedStream) // WithType sets the stream type for the managed stream. func WithType(st StreamType) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.streamType = st } } // WithStreamName allows users to set the stream name this writer will // append to explicitly. By default, the managed client will create the // stream when instantiated if necessary. // // Note: Supplying this option causes other options which affect stream construction // such as WithStreamType and WithDestinationTable to be ignored. func WithStreamName(name string) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.streamID = name } } // WithDestinationTable specifies the destination table to which a created // stream will append rows. Format of the table: // // projects/{projectid}/datasets/{dataset}/tables/{table} func WithDestinationTable(destTable string) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.destinationTable = destTable } } // WithMaxInflightRequests bounds the inflight appends on the write connection. // // Note: See the WithDefaultInflightRequests ClientOption for setting a default // when instantiating a client, rather than setting this limit per-writer. // This WriterOption is ignored for ManagedStreams that participate in multiplexing. func WithMaxInflightRequests(n int) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.MaxInflightRequests = n } } // WithMaxInflightBytes bounds the inflight append request bytes on the write connection. // // Note: See the WithDefaultInflightBytes ClientOption for setting a default // when instantiating a client, rather than setting this limit per-writer. // This WriterOption is ignored for ManagedStreams that participate in multiplexing. func WithMaxInflightBytes(n int) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.MaxInflightBytes = n } } // WithTraceID allows instruments requests to the service with a custom trace prefix. // This is generally for diagnostic purposes only. func WithTraceID(traceID string) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.TraceID = traceID } } // WithSchemaDescriptor describes the format of the serialized data being sent by // AppendRows calls on the stream. func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { return func(ms *ManagedStream) { ms.curTemplate = ms.curTemplate.revise(reviseProtoSchema(dp)) } } // WithMissingValueInterpretations controls how missing values are interpreted // for individual columns. // // You must provide a map to indicate how to interpret missing value for some fields. Missing // values are fields present in user schema but missing in rows. The key is // the field name. The value is the interpretation of missing values for the // field. // // For example, the following option would indicate that missing values in the "foo" // column are interpreted as null, whereas missing values in the "bar" column are // treated as the default value: // // WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ // "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, // "bar": storagepb.AppendRowsRequest_NULL_VALUE, // }) // // If a field is not in this map and has missing values, the missing values // in this field are interpreted as NULL unless overridden with a default missing // value interpretation. // // Currently, field name can only be top-level column name, can't be a struct // field path like 'foo.bar'. func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { return func(ms *ManagedStream) { ms.curTemplate = ms.curTemplate.revise(reviseMissingValueInterpretations(mvi)) } } // WithDefaultMissingValueInterpretation controls how missing values are interpreted by // for a given stream. See WithMissingValueIntepretations for more information about // missing values. // // WithMissingValueIntepretations set for individual colums can override the default chosen // with this option. // // For example, if you want to write // `NULL` instead of using default values for some columns, you can set // `default_missing_value_interpretation` to `DEFAULT_VALUE` and at the same // time, set `missing_value_interpretations` to `NULL_VALUE` on those columns. func WithDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { return func(ms *ManagedStream) { ms.curTemplate = ms.curTemplate.revise(reviseDefaultMissingValueInterpretation(def)) } } // WithDataOrigin is used to attach an origin context to the instrumentation metrics // emitted by the library. func WithDataOrigin(dataOrigin string) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.dataOrigin = dataOrigin } } // WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when // it opens the underlying append stream. // // Note: See the DefaultAppendRowsCallOption ClientOption for setting defaults // when instantiating a client, rather than setting this limit per-writer. This WriterOption // is ignored for ManagedStream writers that participate in multiplexing. func WithAppendRowsCallOption(o gax.CallOption) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, o) } } // EnableWriteRetries enables ManagedStream to automatically retry failed appends. // // Enabling retries is best suited for cases where users want to achieve at-least-once // append semantics. Use of automatic retries may complicate patterns where the user // is designing for exactly-once append semantics. func EnableWriteRetries(enable bool) WriterOption { return func(ms *ManagedStream) { if enable { ms.retry = newStatelessRetryer() } } } // AppendOption are options that can be passed when appending data with a managed stream instance. type AppendOption func(*pendingWrite) // UpdateSchemaDescriptor is used to update the descriptor message schema associated // with a given stream. func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { return func(pw *pendingWrite) { pw.reqTmpl = pw.reqTmpl.revise(reviseProtoSchema(schema)) } } // UpdateMissingValueInterpretations updates the per-column missing-value intepretations settings, // and is retained for subsequent writes. See the WithMissingValueInterpretations WriterOption for // more details. func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { return func(pw *pendingWrite) { pw.reqTmpl = pw.reqTmpl.revise(reviseMissingValueInterpretations(mvi)) } } // UpdateDefaultMissingValueInterpretation updates the default intepretations setting for the stream, // and is retained for subsequent writes. See the WithDefaultMissingValueInterpretations WriterOption for // more details. func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { return func(pw *pendingWrite) { pw.reqTmpl = pw.reqTmpl.revise(reviseDefaultMissingValueInterpretation(def)) } } // WithOffset sets an explicit offset value for this append request. func WithOffset(offset int64) AppendOption { return func(pw *pendingWrite) { pw.req.Offset = &wrapperspb.Int64Value{ Value: offset, } } }