1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "fmt"
20 "runtime"
21
22 "cloud.google.com/go/bigquery/internal"
23 storage "cloud.google.com/go/bigquery/storage/apiv1"
24 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
25 "cloud.google.com/go/internal/detect"
26 gax "github.com/googleapis/gax-go/v2"
27 "google.golang.org/api/option"
28 "google.golang.org/grpc"
29 )
30
31
32 type readClient struct {
33 rawClient *storage.BigQueryReadClient
34 projectID string
35
36 settings readClientSettings
37 }
38
39 type readClientSettings struct {
40 maxStreamCount int
41 maxWorkerCount int
42 }
43
44 func defaultReadClientSettings() readClientSettings {
45 maxWorkerCount := runtime.GOMAXPROCS(0)
46 return readClientSettings{
47
48 maxStreamCount: 0,
49 maxWorkerCount: maxWorkerCount,
50 }
51 }
52
53
54 func newReadClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *readClient, err error) {
55 numConns := runtime.GOMAXPROCS(0)
56 if numConns > 4 {
57 numConns = 4
58 }
59 o := []option.ClientOption{
60 option.WithGRPCConnectionPool(numConns),
61 option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, internal.Version)),
62 }
63 o = append(o, opts...)
64
65 rawClient, err := storage.NewBigQueryReadClient(ctx, o...)
66 if err != nil {
67 return nil, err
68 }
69 rawClient.SetGoogleClientInfo("gccl", internal.Version)
70
71
72 projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
73 if err != nil {
74 return nil, err
75 }
76
77 settings := defaultReadClientSettings()
78 rc := &readClient{
79 rawClient: rawClient,
80 projectID: projectID,
81 settings: settings,
82 }
83
84 return rc, nil
85 }
86
87
88 func (c *readClient) close() error {
89 if c.rawClient == nil {
90 return fmt.Errorf("already closed")
91 }
92 c.rawClient.Close()
93 c.rawClient = nil
94 return nil
95 }
96
97
98 func (c *readClient) sessionForTable(ctx context.Context, table *Table, ordered bool) (*readSession, error) {
99 tableID, err := table.Identifier(StorageAPIResourceID)
100 if err != nil {
101 return nil, err
102 }
103
104
105 settings := c.settings
106 if ordered {
107 settings.maxStreamCount = 1
108 }
109
110 rs := &readSession{
111 ctx: ctx,
112 table: table,
113 tableID: tableID,
114 settings: settings,
115 readRowsFunc: c.rawClient.ReadRows,
116 createReadSessionFunc: c.rawClient.CreateReadSession,
117 }
118 return rs, nil
119 }
120
121
122 type readSession struct {
123 settings readClientSettings
124
125 ctx context.Context
126 table *Table
127 tableID string
128
129 bqSession *storagepb.ReadSession
130
131
132 createReadSessionFunc func(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error)
133 readRowsFunc func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error)
134 }
135
136
137 func (rs *readSession) start() error {
138 var preferredMinStreamCount int32
139 maxStreamCount := int32(rs.settings.maxStreamCount)
140 if maxStreamCount == 0 {
141 preferredMinStreamCount = int32(rs.settings.maxWorkerCount)
142 }
143 createReadSessionRequest := &storagepb.CreateReadSessionRequest{
144 Parent: fmt.Sprintf("projects/%s", rs.table.ProjectID),
145 ReadSession: &storagepb.ReadSession{
146 Table: rs.tableID,
147 DataFormat: storagepb.DataFormat_ARROW,
148 },
149 MaxStreamCount: maxStreamCount,
150 PreferredMinStreamCount: preferredMinStreamCount,
151 }
152 rpcOpts := gax.WithGRPCOptions(
153
154
155 grpc.MaxCallRecvMsgSize(1024 * 1024 * 129),
156 )
157 session, err := rs.createReadSessionFunc(rs.ctx, createReadSessionRequest, rpcOpts)
158 if err != nil {
159 return err
160 }
161 rs.bqSession = session
162 return nil
163 }
164
165
166 func (rs *readSession) readRows(req *storagepb.ReadRowsRequest) (storagepb.BigQueryRead_ReadRowsClient, error) {
167 if rs.bqSession == nil {
168 err := rs.start()
169 if err != nil {
170 return nil, err
171 }
172 }
173 return rs.readRowsFunc(rs.ctx, req)
174 }
175
View as plain text