...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/client.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"runtime"
    21  	"strings"
    22  	"sync"
    23  
    24  	"cloud.google.com/go/bigquery/internal"
    25  	storage "cloud.google.com/go/bigquery/storage/apiv1"
    26  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    27  	"cloud.google.com/go/internal/detect"
    28  	"github.com/google/uuid"
    29  	"github.com/googleapis/gax-go/v2"
    30  	"google.golang.org/api/option"
    31  	"google.golang.org/grpc/metadata"
    32  )
    33  
    34  // DetectProjectID is a sentinel value that instructs NewClient to detect the
    35  // project ID. It is given in place of the projectID argument. NewClient will
    36  // use the project ID from the given credentials or the default credentials
    37  // (https://developers.google.com/accounts/docs/application-default-credentials)
    38  // if no credentials were provided. When providing credentials, not all
    39  // options will allow NewClient to extract the project ID. Specifically a JWT
    40  // does not have the project ID encoded.
    41  const DetectProjectID = "*detect-project-id*"
    42  
    43  // Client is a managed BigQuery Storage write client scoped to a single project.
    44  type Client struct {
    45  	rawClient *storage.BigQueryWriteClient
    46  	projectID string
    47  
    48  	// retained context.  primarily used for connection management and the underlying
    49  	// client.
    50  	ctx    context.Context
    51  	cancel context.CancelFunc
    52  
    53  	// cfg retains general settings (custom ClientOptions).
    54  	cfg *writerClientConfig
    55  
    56  	// mu guards access to shared connectionPool instances.
    57  	mu sync.Mutex
    58  	// When multiplexing is enabled, this map retains connectionPools keyed by region ID.
    59  	pools map[string]*connectionPool
    60  }
    61  
    62  // NewClient instantiates a new client.
    63  //
    64  // The context provided here is retained and used for background connection management
    65  // between the client and the BigQuery Storage service.
    66  func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
    67  	// Set a reasonable default for the gRPC connection pool size.
    68  	numConns := runtime.GOMAXPROCS(0)
    69  	if numConns > 4 {
    70  		numConns = 4
    71  	}
    72  	o := []option.ClientOption{
    73  		option.WithGRPCConnectionPool(numConns),
    74  	}
    75  	o = append(o, opts...)
    76  
    77  	cCtx, cancel := context.WithCancel(ctx)
    78  
    79  	rawClient, err := storage.NewBigQueryWriteClient(cCtx, o...)
    80  	if err != nil {
    81  		cancel()
    82  		return nil, err
    83  	}
    84  	rawClient.SetGoogleClientInfo("gccl", internal.Version)
    85  
    86  	// Handle project autodetection.
    87  	projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
    88  	if err != nil {
    89  		cancel()
    90  		return nil, err
    91  	}
    92  
    93  	return &Client{
    94  		rawClient: rawClient,
    95  		projectID: projectID,
    96  		ctx:       cCtx,
    97  		cancel:    cancel,
    98  		cfg:       newWriterClientConfig(opts...),
    99  		pools:     make(map[string]*connectionPool),
   100  	}, nil
   101  }
   102  
   103  // Close releases resources held by the client.
   104  func (c *Client) Close() error {
   105  
   106  	// Shutdown the per-region pools.
   107  	c.mu.Lock()
   108  	defer c.mu.Unlock()
   109  	var firstErr error
   110  	for _, pool := range c.pools {
   111  		if err := pool.Close(); err != nil && firstErr == nil {
   112  			firstErr = err
   113  		}
   114  	}
   115  
   116  	// Close the underlying client stub.
   117  	if err := c.rawClient.Close(); err != nil && firstErr == nil {
   118  		firstErr = err
   119  	}
   120  	// Cancel the retained client context.
   121  	if c.cancel != nil {
   122  		c.cancel()
   123  	}
   124  	return firstErr
   125  }
   126  
   127  // NewManagedStream establishes a new managed stream for appending data into a table.
   128  //
   129  // Context here is retained for use by the underlying streaming connections the managed stream may create.
   130  func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {
   131  	return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...)
   132  }
   133  
   134  // createOpenF builds the opener function we need to access the AppendRows bidi stream.
   135  func createOpenF(streamFunc streamClientFunc, routingHeader string) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   136  	return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   137  		if routingHeader != "" {
   138  			ctx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", routingHeader)
   139  		}
   140  		arc, err := streamFunc(ctx, opts...)
   141  		if err != nil {
   142  			return nil, err
   143  		}
   144  		return arc, nil
   145  	}
   146  }
   147  
   148  func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) {
   149  	// First, we create a minimal managed stream.
   150  	writer := &ManagedStream{
   151  		id:             newUUID(writerIDPrefix),
   152  		c:              c,
   153  		streamSettings: defaultStreamSettings(),
   154  		curTemplate:    newVersionedTemplate(),
   155  	}
   156  	// apply writer options.
   157  	for _, opt := range opts {
   158  		opt(writer)
   159  	}
   160  
   161  	// skipSetup allows for customization at test time.
   162  	// Examine out config writer and apply settings to the real one.
   163  	if !skipSetup {
   164  		if err := c.validateOptions(ctx, writer); err != nil {
   165  			return nil, err
   166  		}
   167  
   168  		if writer.streamSettings.streamID == "" {
   169  			// not instantiated with a stream, construct one.
   170  			streamName := fmt.Sprintf("%s/streams/_default", writer.streamSettings.destinationTable)
   171  			if writer.streamSettings.streamType != DefaultStream {
   172  				// For everything but a default stream, we create a new stream on behalf of the user.
   173  				req := &storagepb.CreateWriteStreamRequest{
   174  					Parent: writer.streamSettings.destinationTable,
   175  					WriteStream: &storagepb.WriteStream{
   176  						Type: streamTypeToEnum(writer.streamSettings.streamType),
   177  					}}
   178  				resp, err := writer.c.rawClient.CreateWriteStream(ctx, req)
   179  				if err != nil {
   180  					return nil, fmt.Errorf("couldn't create write stream: %w", err)
   181  				}
   182  				streamName = resp.GetName()
   183  			}
   184  			writer.streamSettings.streamID = streamName
   185  		}
   186  	}
   187  	// we maintain a pool per region, and attach all exclusive and multiplex writers to that pool.
   188  	pool, err := c.resolvePool(ctx, writer.streamSettings, streamFunc)
   189  	if err != nil {
   190  		return nil, err
   191  	}
   192  	// Add the writer to the pool.
   193  	if err := pool.addWriter(writer); err != nil {
   194  		return nil, err
   195  	}
   196  	writer.ctx, writer.cancel = context.WithCancel(ctx)
   197  
   198  	// Attach any tag keys to the context on the writer, so instrumentation works as expected.
   199  	writer.ctx = setupWriterStatContext(writer)
   200  	return writer, nil
   201  }
   202  
   203  // validateOptions is used to validate that we received a sane/compatible set of WriterOptions
   204  // for constructing a new managed stream.
   205  func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
   206  	if ms == nil {
   207  		return fmt.Errorf("no managed stream definition")
   208  	}
   209  	if ms.streamSettings.streamID != "" {
   210  		// User supplied a stream, we need to verify it exists.
   211  		info, err := c.getWriteStream(ctx, ms.streamSettings.streamID, false)
   212  		if err != nil {
   213  			return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
   214  		}
   215  		// update type and destination based on stream metadata
   216  		ms.streamSettings.streamType = StreamType(info.Type.String())
   217  		ms.streamSettings.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
   218  	}
   219  	if ms.streamSettings.destinationTable == "" {
   220  		return fmt.Errorf("no destination table specified")
   221  	}
   222  	// we could auto-select DEFAULT here, but let's force users to be specific for now.
   223  	if ms.StreamType() == "" {
   224  		return fmt.Errorf("stream type wasn't specified")
   225  	}
   226  	return nil
   227  }
   228  
   229  // resolvePool either returns an existing connectionPool, or returns a new pool if this is the first writer in a given region.
   230  func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
   231  	c.mu.Lock()
   232  	defer c.mu.Unlock()
   233  	resp, err := c.getWriteStream(ctx, settings.streamID, false)
   234  	if err != nil {
   235  		return nil, err
   236  	}
   237  	loc := resp.GetLocation()
   238  	if pool, ok := c.pools[loc]; ok {
   239  		return pool, nil
   240  	}
   241  
   242  	// No existing pool available, create one for the location and add to shared pools.
   243  	pool, err := c.createPool(loc, streamFunc)
   244  	if err != nil {
   245  		return nil, err
   246  	}
   247  	c.pools[loc] = pool
   248  	return pool, nil
   249  }
   250  
   251  // createPool builds a connectionPool.
   252  func (c *Client) createPool(location string, streamFunc streamClientFunc) (*connectionPool, error) {
   253  	cCtx, cancel := context.WithCancel(c.ctx)
   254  
   255  	if c.cfg == nil {
   256  		cancel()
   257  		return nil, fmt.Errorf("missing client config")
   258  	}
   259  
   260  	var routingHeader string
   261  	/*
   262  	 * TODO: set once backend respects the new routing header
   263  	 * if location != "" && c.projectID != "" {
   264  	 *  	routingHeader = fmt.Sprintf("write_location=projects/%s/locations/%s", c.projectID, location)
   265  	 * }
   266  	 */
   267  
   268  	pool := &connectionPool{
   269  		id:                 newUUID(poolIDPrefix),
   270  		location:           location,
   271  		ctx:                cCtx,
   272  		cancel:             cancel,
   273  		open:               createOpenF(streamFunc, routingHeader),
   274  		callOptions:        c.cfg.defaultAppendRowsCallOptions,
   275  		baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes),
   276  	}
   277  	router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize)
   278  	if err := pool.activateRouter(router); err != nil {
   279  		return nil, err
   280  	}
   281  	return pool, nil
   282  }
   283  
   284  // BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same
   285  // parent table.
   286  //
   287  // Streams must be finalized before commit and cannot be committed multiple
   288  // times. Once a stream is committed, data in the stream becomes available
   289  // for read operations.
   290  func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
   291  	return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...)
   292  }
   293  
   294  // CreateWriteStream creates a write stream to the given table.
   295  // Additionally, every table has a special stream named ‘_default’
   296  // to which data can be written. This stream doesn’t need to be created using
   297  // CreateWriteStream. It is a stream that can be used simultaneously by any
   298  // number of clients. Data written to this stream is considered committed as
   299  // soon as an acknowledgement is received.
   300  func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
   301  	return c.rawClient.CreateWriteStream(ctx, req, opts...)
   302  }
   303  
   304  // GetWriteStream returns information about a given WriteStream.
   305  func (c *Client) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
   306  	return c.rawClient.GetWriteStream(ctx, req, opts...)
   307  }
   308  
   309  // getWriteStream is an internal version of GetWriteStream used for writer setup and validation.
   310  func (c *Client) getWriteStream(ctx context.Context, streamName string, fullView bool) (*storagepb.WriteStream, error) {
   311  	req := &storagepb.GetWriteStreamRequest{
   312  		Name: streamName,
   313  	}
   314  	if fullView {
   315  		req.View = storagepb.WriteStreamView_FULL
   316  	}
   317  	return c.rawClient.GetWriteStream(ctx, req)
   318  }
   319  
   320  // TableParentFromStreamName is a utility function for extracting the parent table
   321  // prefix from a stream name.  When an invalid stream ID is passed, this simply returns
   322  // the original stream name.
   323  func TableParentFromStreamName(streamName string) string {
   324  	// Stream IDs have the following prefix:
   325  	// projects/{project}/datasets/{dataset}/tables/{table}/blah
   326  	parts := strings.SplitN(streamName, "/", 7)
   327  	if len(parts) < 7 {
   328  		// invalid; just pass back the input
   329  		return streamName
   330  	}
   331  	return strings.Join(parts[:6], "/")
   332  }
   333  
   334  // TableParentFromParts constructs a table identifier using individual identifiers and
   335  // returns a string in the form "projects/{project}/datasets/{dataset}/tables/{table}".
   336  func TableParentFromParts(projectID, datasetID, tableID string) string {
   337  	return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
   338  }
   339  
   340  // newUUID simplifies generating UUIDs for internal resources.
   341  func newUUID(prefix string) string {
   342  	id := uuid.New()
   343  	return fmt.Sprintf("%s_%s", prefix, id.String())
   344  }
   345  
   346  // canMultiplex returns true if the input identifier supports multiplexing.  Currently the only stream
   347  // type that supports multiplexing are default streams.
   348  func canMultiplex(in string) bool {
   349  	// TODO: strengthen validation
   350  	return strings.HasSuffix(in, "default")
   351  }
   352  

View as plain text