// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "context" "fmt" "runtime" "strings" "sync" "cloud.google.com/go/bigquery/internal" storage "cloud.google.com/go/bigquery/storage/apiv1" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "cloud.google.com/go/internal/detect" "github.com/google/uuid" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/grpc/metadata" ) // DetectProjectID is a sentinel value that instructs NewClient to detect the // project ID. It is given in place of the projectID argument. NewClient will // use the project ID from the given credentials or the default credentials // (https://developers.google.com/accounts/docs/application-default-credentials) // if no credentials were provided. When providing credentials, not all // options will allow NewClient to extract the project ID. Specifically a JWT // does not have the project ID encoded. const DetectProjectID = "*detect-project-id*" // Client is a managed BigQuery Storage write client scoped to a single project. type Client struct { rawClient *storage.BigQueryWriteClient projectID string // retained context. primarily used for connection management and the underlying // client. ctx context.Context cancel context.CancelFunc // cfg retains general settings (custom ClientOptions). cfg *writerClientConfig // mu guards access to shared connectionPool instances. mu sync.Mutex // When multiplexing is enabled, this map retains connectionPools keyed by region ID. pools map[string]*connectionPool } // NewClient instantiates a new client. // // The context provided here is retained and used for background connection management // between the client and the BigQuery Storage service. func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) { // Set a reasonable default for the gRPC connection pool size. numConns := runtime.GOMAXPROCS(0) if numConns > 4 { numConns = 4 } o := []option.ClientOption{ option.WithGRPCConnectionPool(numConns), } o = append(o, opts...) cCtx, cancel := context.WithCancel(ctx) rawClient, err := storage.NewBigQueryWriteClient(cCtx, o...) if err != nil { cancel() return nil, err } rawClient.SetGoogleClientInfo("gccl", internal.Version) // Handle project autodetection. projectID, err = detect.ProjectID(ctx, projectID, "", opts...) if err != nil { cancel() return nil, err } return &Client{ rawClient: rawClient, projectID: projectID, ctx: cCtx, cancel: cancel, cfg: newWriterClientConfig(opts...), pools: make(map[string]*connectionPool), }, nil } // Close releases resources held by the client. func (c *Client) Close() error { // Shutdown the per-region pools. c.mu.Lock() defer c.mu.Unlock() var firstErr error for _, pool := range c.pools { if err := pool.Close(); err != nil && firstErr == nil { firstErr = err } } // Close the underlying client stub. if err := c.rawClient.Close(); err != nil && firstErr == nil { firstErr = err } // Cancel the retained client context. if c.cancel != nil { c.cancel() } return firstErr } // NewManagedStream establishes a new managed stream for appending data into a table. // // Context here is retained for use by the underlying streaming connections the managed stream may create. func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) { return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...) } // createOpenF builds the opener function we need to access the AppendRows bidi stream. func createOpenF(streamFunc streamClientFunc, routingHeader string) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { if routingHeader != "" { ctx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", routingHeader) } arc, err := streamFunc(ctx, opts...) if err != nil { return nil, err } return arc, nil } } func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) { // First, we create a minimal managed stream. writer := &ManagedStream{ id: newUUID(writerIDPrefix), c: c, streamSettings: defaultStreamSettings(), curTemplate: newVersionedTemplate(), } // apply writer options. for _, opt := range opts { opt(writer) } // skipSetup allows for customization at test time. // Examine out config writer and apply settings to the real one. if !skipSetup { if err := c.validateOptions(ctx, writer); err != nil { return nil, err } if writer.streamSettings.streamID == "" { // not instantiated with a stream, construct one. streamName := fmt.Sprintf("%s/streams/_default", writer.streamSettings.destinationTable) if writer.streamSettings.streamType != DefaultStream { // For everything but a default stream, we create a new stream on behalf of the user. req := &storagepb.CreateWriteStreamRequest{ Parent: writer.streamSettings.destinationTable, WriteStream: &storagepb.WriteStream{ Type: streamTypeToEnum(writer.streamSettings.streamType), }} resp, err := writer.c.rawClient.CreateWriteStream(ctx, req) if err != nil { return nil, fmt.Errorf("couldn't create write stream: %w", err) } streamName = resp.GetName() } writer.streamSettings.streamID = streamName } } // we maintain a pool per region, and attach all exclusive and multiplex writers to that pool. pool, err := c.resolvePool(ctx, writer.streamSettings, streamFunc) if err != nil { return nil, err } // Add the writer to the pool. if err := pool.addWriter(writer); err != nil { return nil, err } writer.ctx, writer.cancel = context.WithCancel(ctx) // Attach any tag keys to the context on the writer, so instrumentation works as expected. writer.ctx = setupWriterStatContext(writer) return writer, nil } // validateOptions is used to validate that we received a sane/compatible set of WriterOptions // for constructing a new managed stream. func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error { if ms == nil { return fmt.Errorf("no managed stream definition") } if ms.streamSettings.streamID != "" { // User supplied a stream, we need to verify it exists. info, err := c.getWriteStream(ctx, ms.streamSettings.streamID, false) if err != nil { return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err) } // update type and destination based on stream metadata ms.streamSettings.streamType = StreamType(info.Type.String()) ms.streamSettings.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID) } if ms.streamSettings.destinationTable == "" { return fmt.Errorf("no destination table specified") } // we could auto-select DEFAULT here, but let's force users to be specific for now. if ms.StreamType() == "" { return fmt.Errorf("stream type wasn't specified") } return nil } // resolvePool either returns an existing connectionPool, or returns a new pool if this is the first writer in a given region. func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) { c.mu.Lock() defer c.mu.Unlock() resp, err := c.getWriteStream(ctx, settings.streamID, false) if err != nil { return nil, err } loc := resp.GetLocation() if pool, ok := c.pools[loc]; ok { return pool, nil } // No existing pool available, create one for the location and add to shared pools. pool, err := c.createPool(loc, streamFunc) if err != nil { return nil, err } c.pools[loc] = pool return pool, nil } // createPool builds a connectionPool. func (c *Client) createPool(location string, streamFunc streamClientFunc) (*connectionPool, error) { cCtx, cancel := context.WithCancel(c.ctx) if c.cfg == nil { cancel() return nil, fmt.Errorf("missing client config") } var routingHeader string /* * TODO: set once backend respects the new routing header * if location != "" && c.projectID != "" { * routingHeader = fmt.Sprintf("write_location=projects/%s/locations/%s", c.projectID, location) * } */ pool := &connectionPool{ id: newUUID(poolIDPrefix), location: location, ctx: cCtx, cancel: cancel, open: createOpenF(streamFunc, routingHeader), callOptions: c.cfg.defaultAppendRowsCallOptions, baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes), } router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize) if err := pool.activateRouter(router); err != nil { return nil, err } return pool, nil } // BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same // parent table. // // Streams must be finalized before commit and cannot be committed multiple // times. Once a stream is committed, data in the stream becomes available // for read operations. func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) { return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...) } // CreateWriteStream creates a write stream to the given table. // Additionally, every table has a special stream named ‘_default’ // to which data can be written. This stream doesn’t need to be created using // CreateWriteStream. It is a stream that can be used simultaneously by any // number of clients. Data written to this stream is considered committed as // soon as an acknowledgement is received. func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) { return c.rawClient.CreateWriteStream(ctx, req, opts...) } // GetWriteStream returns information about a given WriteStream. func (c *Client) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) { return c.rawClient.GetWriteStream(ctx, req, opts...) } // getWriteStream is an internal version of GetWriteStream used for writer setup and validation. func (c *Client) getWriteStream(ctx context.Context, streamName string, fullView bool) (*storagepb.WriteStream, error) { req := &storagepb.GetWriteStreamRequest{ Name: streamName, } if fullView { req.View = storagepb.WriteStreamView_FULL } return c.rawClient.GetWriteStream(ctx, req) } // TableParentFromStreamName is a utility function for extracting the parent table // prefix from a stream name. When an invalid stream ID is passed, this simply returns // the original stream name. func TableParentFromStreamName(streamName string) string { // Stream IDs have the following prefix: // projects/{project}/datasets/{dataset}/tables/{table}/blah parts := strings.SplitN(streamName, "/", 7) if len(parts) < 7 { // invalid; just pass back the input return streamName } return strings.Join(parts[:6], "/") } // TableParentFromParts constructs a table identifier using individual identifiers and // returns a string in the form "projects/{project}/datasets/{dataset}/tables/{table}". func TableParentFromParts(projectID, datasetID, tableID string) string { return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID) } // newUUID simplifies generating UUIDs for internal resources. func newUUID(prefix string) string { id := uuid.New() return fmt.Sprintf("%s_%s", prefix, id.String()) } // canMultiplex returns true if the input identifier supports multiplexing. Currently the only stream // type that supports multiplexing are default streams. func canMultiplex(in string) bool { // TODO: strengthen validation return strings.HasSuffix(in, "default") }