1 // Copyright 2021 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package managedwriter 16 17 import ( 18 "cloud.google.com/go/bigquery/storage/apiv1/storagepb" 19 "github.com/googleapis/gax-go/v2" 20 "google.golang.org/api/option" 21 "google.golang.org/api/option/internaloption" 22 "google.golang.org/protobuf/types/descriptorpb" 23 "google.golang.org/protobuf/types/known/wrapperspb" 24 ) 25 26 // encapsulates custom client-level config settings. 27 type writerClientConfig struct { 28 useMultiplex bool 29 maxMultiplexPoolSize int 30 defaultInflightRequests int 31 defaultInflightBytes int 32 defaultAppendRowsCallOptions []gax.CallOption 33 } 34 35 // newWriterClientConfig builds a client config based on package-specific custom ClientOptions. 36 func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig { 37 conf := &writerClientConfig{} 38 for _, opt := range opts { 39 if wOpt, ok := opt.(writerClientOption); ok { 40 wOpt.ApplyWriterOpt(conf) 41 } 42 } 43 44 // Normalize the config to ensure we're dealing with sane values. 45 if conf.useMultiplex { 46 if conf.maxMultiplexPoolSize < 1 { 47 conf.maxMultiplexPoolSize = 1 48 } 49 } 50 if conf.defaultInflightBytes < 0 { 51 conf.defaultInflightBytes = 0 52 } 53 if conf.defaultInflightRequests < 0 { 54 conf.defaultInflightRequests = 0 55 } 56 return conf 57 } 58 59 // writerClientOption allows us to extend ClientOptions for client-specific needs. 60 type writerClientOption interface { 61 option.ClientOption 62 ApplyWriterOpt(*writerClientConfig) 63 } 64 65 // WithMultiplexing is an EXPERIMENTAL option that controls connection sharing 66 // when instantiating the Client. Only writes to default streams can leverage the 67 // multiplex pool. Internally, the client maintains a pool of connections per BigQuery 68 // destination region, and will grow the pool to it's maximum allowed size if there's 69 // sufficient traffic on the shared connection(s). 70 // 71 // This ClientOption is EXPERIMENTAL and subject to change. 72 func WithMultiplexing() option.ClientOption { 73 return &enableMultiplexSetting{useMultiplex: true} 74 } 75 76 type enableMultiplexSetting struct { 77 internaloption.EmbeddableAdapter 78 useMultiplex bool 79 } 80 81 func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) { 82 c.useMultiplex = s.useMultiplex 83 } 84 85 // WithMultiplexPoolLimit is an EXPERIMENTAL option that sets the maximum 86 // shared multiplex pool size when instantiating the Client. If multiplexing 87 // is not enabled, this setting is ignored. By default, the limit is a single 88 // shared connection. This limit is applied per destination region. 89 // 90 // This ClientOption is EXPERIMENTAL and subject to change. 91 func WithMultiplexPoolLimit(maxSize int) option.ClientOption { 92 return &maxMultiplexPoolSizeSetting{maxSize: maxSize} 93 } 94 95 type maxMultiplexPoolSizeSetting struct { 96 internaloption.EmbeddableAdapter 97 maxSize int 98 } 99 100 func (s *maxMultiplexPoolSizeSetting) ApplyWriterOpt(c *writerClientConfig) { 101 c.maxMultiplexPoolSize = s.maxSize 102 } 103 104 // WithDefaultInflightRequests is an EXPERIMENTAL ClientOption for controlling 105 // the default limit of how many individual AppendRows write requests can 106 // be in flight on a connection at a time. This limit is enforced on all connections 107 // created by the instantiated Client. 108 // 109 // Note: the WithMaxInflightRequests WriterOption can still be used to control 110 // the behavior for individual ManagedStream writers when not using multiplexing. 111 // 112 // This ClientOption is EXPERIMENTAL and subject to change. 113 func WithDefaultInflightRequests(n int) option.ClientOption { 114 return &defaultInflightRequestsSetting{maxRequests: n} 115 } 116 117 type defaultInflightRequestsSetting struct { 118 internaloption.EmbeddableAdapter 119 maxRequests int 120 } 121 122 func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) { 123 c.defaultInflightRequests = s.maxRequests 124 } 125 126 // WithDefaultInflightBytes is an EXPERIMENTAL ClientOption for controlling 127 // the default byte limit for how many individual AppendRows write requests can 128 // be in flight on a connection at a time. This limit is enforced on all connections 129 // created by the instantiated Client. 130 // 131 // Note: the WithMaxInflightBytes WriterOption can still be used to control 132 // the behavior for individual ManagedStream writers when not using multiplexing. 133 // 134 // This ClientOption is EXPERIMENTAL and subject to change. 135 func WithDefaultInflightBytes(n int) option.ClientOption { 136 return &defaultInflightBytesSetting{maxBytes: n} 137 } 138 139 type defaultInflightBytesSetting struct { 140 internaloption.EmbeddableAdapter 141 maxBytes int 142 } 143 144 func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) { 145 c.defaultInflightBytes = s.maxBytes 146 } 147 148 // WithDefaultAppendRowsCallOption is an EXPERIMENTAL ClientOption for controlling 149 // the gax.CallOptions passed when opening the underlying AppendRows bidi 150 // stream connections used by this library to communicate with the BigQuery 151 // Storage service. This option is propagated to all 152 // connections created by the instantiated Client. 153 // 154 // Note: the WithAppendRowsCallOption WriterOption can still be used to control 155 // the behavior for individual ManagedStream writers that don't participate 156 // in multiplexing. 157 // 158 // This ClientOption is EXPERIMENTAL and subject to change. 159 func WithDefaultAppendRowsCallOption(o gax.CallOption) option.ClientOption { 160 return &defaultAppendRowsCallOptionSetting{opt: o} 161 } 162 163 type defaultAppendRowsCallOptionSetting struct { 164 internaloption.EmbeddableAdapter 165 opt gax.CallOption 166 } 167 168 func (s *defaultAppendRowsCallOptionSetting) ApplyWriterOpt(c *writerClientConfig) { 169 c.defaultAppendRowsCallOptions = append(c.defaultAppendRowsCallOptions, s.opt) 170 } 171 172 // WriterOption are variadic options used to configure a ManagedStream instance. 173 type WriterOption func(*ManagedStream) 174 175 // WithType sets the stream type for the managed stream. 176 func WithType(st StreamType) WriterOption { 177 return func(ms *ManagedStream) { 178 ms.streamSettings.streamType = st 179 } 180 } 181 182 // WithStreamName allows users to set the stream name this writer will 183 // append to explicitly. By default, the managed client will create the 184 // stream when instantiated if necessary. 185 // 186 // Note: Supplying this option causes other options which affect stream construction 187 // such as WithStreamType and WithDestinationTable to be ignored. 188 func WithStreamName(name string) WriterOption { 189 return func(ms *ManagedStream) { 190 ms.streamSettings.streamID = name 191 } 192 } 193 194 // WithDestinationTable specifies the destination table to which a created 195 // stream will append rows. Format of the table: 196 // 197 // projects/{projectid}/datasets/{dataset}/tables/{table} 198 func WithDestinationTable(destTable string) WriterOption { 199 return func(ms *ManagedStream) { 200 ms.streamSettings.destinationTable = destTable 201 } 202 } 203 204 // WithMaxInflightRequests bounds the inflight appends on the write connection. 205 // 206 // Note: See the WithDefaultInflightRequests ClientOption for setting a default 207 // when instantiating a client, rather than setting this limit per-writer. 208 // This WriterOption is ignored for ManagedStreams that participate in multiplexing. 209 func WithMaxInflightRequests(n int) WriterOption { 210 return func(ms *ManagedStream) { 211 ms.streamSettings.MaxInflightRequests = n 212 } 213 } 214 215 // WithMaxInflightBytes bounds the inflight append request bytes on the write connection. 216 // 217 // Note: See the WithDefaultInflightBytes ClientOption for setting a default 218 // when instantiating a client, rather than setting this limit per-writer. 219 // This WriterOption is ignored for ManagedStreams that participate in multiplexing. 220 func WithMaxInflightBytes(n int) WriterOption { 221 return func(ms *ManagedStream) { 222 ms.streamSettings.MaxInflightBytes = n 223 } 224 } 225 226 // WithTraceID allows instruments requests to the service with a custom trace prefix. 227 // This is generally for diagnostic purposes only. 228 func WithTraceID(traceID string) WriterOption { 229 return func(ms *ManagedStream) { 230 ms.streamSettings.TraceID = traceID 231 } 232 } 233 234 // WithSchemaDescriptor describes the format of the serialized data being sent by 235 // AppendRows calls on the stream. 236 func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { 237 return func(ms *ManagedStream) { 238 ms.curTemplate = ms.curTemplate.revise(reviseProtoSchema(dp)) 239 } 240 } 241 242 // WithMissingValueInterpretations controls how missing values are interpreted 243 // for individual columns. 244 // 245 // You must provide a map to indicate how to interpret missing value for some fields. Missing 246 // values are fields present in user schema but missing in rows. The key is 247 // the field name. The value is the interpretation of missing values for the 248 // field. 249 // 250 // For example, the following option would indicate that missing values in the "foo" 251 // column are interpreted as null, whereas missing values in the "bar" column are 252 // treated as the default value: 253 // 254 // WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ 255 // "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, 256 // "bar": storagepb.AppendRowsRequest_NULL_VALUE, 257 // }) 258 // 259 // If a field is not in this map and has missing values, the missing values 260 // in this field are interpreted as NULL unless overridden with a default missing 261 // value interpretation. 262 // 263 // Currently, field name can only be top-level column name, can't be a struct 264 // field path like 'foo.bar'. 265 func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { 266 return func(ms *ManagedStream) { 267 ms.curTemplate = ms.curTemplate.revise(reviseMissingValueInterpretations(mvi)) 268 } 269 } 270 271 // WithDefaultMissingValueInterpretation controls how missing values are interpreted by 272 // for a given stream. See WithMissingValueIntepretations for more information about 273 // missing values. 274 // 275 // WithMissingValueIntepretations set for individual colums can override the default chosen 276 // with this option. 277 // 278 // For example, if you want to write 279 // `NULL` instead of using default values for some columns, you can set 280 // `default_missing_value_interpretation` to `DEFAULT_VALUE` and at the same 281 // time, set `missing_value_interpretations` to `NULL_VALUE` on those columns. 282 func WithDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { 283 return func(ms *ManagedStream) { 284 ms.curTemplate = ms.curTemplate.revise(reviseDefaultMissingValueInterpretation(def)) 285 } 286 } 287 288 // WithDataOrigin is used to attach an origin context to the instrumentation metrics 289 // emitted by the library. 290 func WithDataOrigin(dataOrigin string) WriterOption { 291 return func(ms *ManagedStream) { 292 ms.streamSettings.dataOrigin = dataOrigin 293 } 294 } 295 296 // WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when 297 // it opens the underlying append stream. 298 // 299 // Note: See the DefaultAppendRowsCallOption ClientOption for setting defaults 300 // when instantiating a client, rather than setting this limit per-writer. This WriterOption 301 // is ignored for ManagedStream writers that participate in multiplexing. 302 func WithAppendRowsCallOption(o gax.CallOption) WriterOption { 303 return func(ms *ManagedStream) { 304 ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, o) 305 } 306 } 307 308 // EnableWriteRetries enables ManagedStream to automatically retry failed appends. 309 // 310 // Enabling retries is best suited for cases where users want to achieve at-least-once 311 // append semantics. Use of automatic retries may complicate patterns where the user 312 // is designing for exactly-once append semantics. 313 func EnableWriteRetries(enable bool) WriterOption { 314 return func(ms *ManagedStream) { 315 if enable { 316 ms.retry = newStatelessRetryer() 317 } 318 } 319 } 320 321 // AppendOption are options that can be passed when appending data with a managed stream instance. 322 type AppendOption func(*pendingWrite) 323 324 // UpdateSchemaDescriptor is used to update the descriptor message schema associated 325 // with a given stream. 326 func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { 327 return func(pw *pendingWrite) { 328 pw.reqTmpl = pw.reqTmpl.revise(reviseProtoSchema(schema)) 329 } 330 } 331 332 // UpdateMissingValueInterpretations updates the per-column missing-value intepretations settings, 333 // and is retained for subsequent writes. See the WithMissingValueInterpretations WriterOption for 334 // more details. 335 func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { 336 return func(pw *pendingWrite) { 337 pw.reqTmpl = pw.reqTmpl.revise(reviseMissingValueInterpretations(mvi)) 338 } 339 } 340 341 // UpdateDefaultMissingValueInterpretation updates the default intepretations setting for the stream, 342 // and is retained for subsequent writes. See the WithDefaultMissingValueInterpretations WriterOption for 343 // more details. 344 func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { 345 return func(pw *pendingWrite) { 346 pw.reqTmpl = pw.reqTmpl.revise(reviseDefaultMissingValueInterpretation(def)) 347 } 348 } 349 350 // WithOffset sets an explicit offset value for this append request. 351 func WithOffset(offset int64) AppendOption { 352 return func(pw *pendingWrite) { 353 pw.req.Offset = &wrapperspb.Int64Value{ 354 Value: offset, 355 } 356 } 357 } 358