...

Source file src/cloud.google.com/go/bigquery/storage_client.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"runtime"
    21  
    22  	"cloud.google.com/go/bigquery/internal"
    23  	storage "cloud.google.com/go/bigquery/storage/apiv1"
    24  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    25  	"cloud.google.com/go/internal/detect"
    26  	gax "github.com/googleapis/gax-go/v2"
    27  	"google.golang.org/api/option"
    28  	"google.golang.org/grpc"
    29  )
    30  
    31  // readClient is a managed BigQuery Storage read client scoped to a single project.
    32  type readClient struct {
    33  	rawClient *storage.BigQueryReadClient
    34  	projectID string
    35  
    36  	settings readClientSettings
    37  }
    38  
    39  type readClientSettings struct {
    40  	maxStreamCount int
    41  	maxWorkerCount int
    42  }
    43  
    44  func defaultReadClientSettings() readClientSettings {
    45  	maxWorkerCount := runtime.GOMAXPROCS(0)
    46  	return readClientSettings{
    47  		// with zero, the server will provide a value of streams so as to produce reasonable throughput
    48  		maxStreamCount: 0,
    49  		maxWorkerCount: maxWorkerCount,
    50  	}
    51  }
    52  
    53  // newReadClient instantiates a new storage read client.
    54  func newReadClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *readClient, err error) {
    55  	numConns := runtime.GOMAXPROCS(0)
    56  	if numConns > 4 {
    57  		numConns = 4
    58  	}
    59  	o := []option.ClientOption{
    60  		option.WithGRPCConnectionPool(numConns),
    61  		option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, internal.Version)),
    62  	}
    63  	o = append(o, opts...)
    64  
    65  	rawClient, err := storage.NewBigQueryReadClient(ctx, o...)
    66  	if err != nil {
    67  		return nil, err
    68  	}
    69  	rawClient.SetGoogleClientInfo("gccl", internal.Version)
    70  
    71  	// Handle project autodetection.
    72  	projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
    73  	if err != nil {
    74  		return nil, err
    75  	}
    76  
    77  	settings := defaultReadClientSettings()
    78  	rc := &readClient{
    79  		rawClient: rawClient,
    80  		projectID: projectID,
    81  		settings:  settings,
    82  	}
    83  
    84  	return rc, nil
    85  }
    86  
    87  // close releases resources held by the client.
    88  func (c *readClient) close() error {
    89  	if c.rawClient == nil {
    90  		return fmt.Errorf("already closed")
    91  	}
    92  	c.rawClient.Close()
    93  	c.rawClient = nil
    94  	return nil
    95  }
    96  
    97  // sessionForTable establishes a new session to fetch from a table using the Storage API
    98  func (c *readClient) sessionForTable(ctx context.Context, table *Table, ordered bool) (*readSession, error) {
    99  	tableID, err := table.Identifier(StorageAPIResourceID)
   100  	if err != nil {
   101  		return nil, err
   102  	}
   103  
   104  	// copy settings for a given session, to avoid overrides for all sessions
   105  	settings := c.settings
   106  	if ordered {
   107  		settings.maxStreamCount = 1
   108  	}
   109  
   110  	rs := &readSession{
   111  		ctx:                   ctx,
   112  		table:                 table,
   113  		tableID:               tableID,
   114  		settings:              settings,
   115  		readRowsFunc:          c.rawClient.ReadRows,
   116  		createReadSessionFunc: c.rawClient.CreateReadSession,
   117  	}
   118  	return rs, nil
   119  }
   120  
   121  // ReadSession is the abstraction over a storage API read session.
   122  type readSession struct {
   123  	settings readClientSettings
   124  
   125  	ctx     context.Context
   126  	table   *Table
   127  	tableID string
   128  
   129  	bqSession *storagepb.ReadSession
   130  
   131  	// decouple from readClient to enable testing
   132  	createReadSessionFunc func(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
   133  	readRowsFunc          func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error)
   134  }
   135  
   136  // Start initiates a read session
   137  func (rs *readSession) start() error {
   138  	var preferredMinStreamCount int32
   139  	maxStreamCount := int32(rs.settings.maxStreamCount)
   140  	if maxStreamCount == 0 {
   141  		preferredMinStreamCount = int32(rs.settings.maxWorkerCount)
   142  	}
   143  	createReadSessionRequest := &storagepb.CreateReadSessionRequest{
   144  		Parent: fmt.Sprintf("projects/%s", rs.table.ProjectID),
   145  		ReadSession: &storagepb.ReadSession{
   146  			Table:      rs.tableID,
   147  			DataFormat: storagepb.DataFormat_ARROW,
   148  		},
   149  		MaxStreamCount:          maxStreamCount,
   150  		PreferredMinStreamCount: preferredMinStreamCount,
   151  	}
   152  	rpcOpts := gax.WithGRPCOptions(
   153  		// Read API can send batches up to 128MB
   154  		// https://cloud.google.com/bigquery/quotas#storage-limits
   155  		grpc.MaxCallRecvMsgSize(1024 * 1024 * 129),
   156  	)
   157  	session, err := rs.createReadSessionFunc(rs.ctx, createReadSessionRequest, rpcOpts)
   158  	if err != nil {
   159  		return err
   160  	}
   161  	rs.bqSession = session
   162  	return nil
   163  }
   164  
   165  // readRows returns a more direct iterators to the underlying Storage API row stream.
   166  func (rs *readSession) readRows(req *storagepb.ReadRowsRequest) (storagepb.BigQueryRead_ReadRowsClient, error) {
   167  	if rs.bqSession == nil {
   168  		err := rs.start()
   169  		if err != nil {
   170  			return nil, err
   171  		}
   172  	}
   173  	return rs.readRowsFunc(rs.ctx, req)
   174  }
   175  

View as plain text