1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "fmt"
20 "runtime"
21 "strings"
22 "sync"
23
24 "cloud.google.com/go/bigquery/internal"
25 storage "cloud.google.com/go/bigquery/storage/apiv1"
26 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
27 "cloud.google.com/go/internal/detect"
28 "github.com/google/uuid"
29 "github.com/googleapis/gax-go/v2"
30 "google.golang.org/api/option"
31 "google.golang.org/grpc/metadata"
32 )
33
34
35
36
37
38
39
40
41 const DetectProjectID = "*detect-project-id*"
42
43
44 type Client struct {
45 rawClient *storage.BigQueryWriteClient
46 projectID string
47
48
49
50 ctx context.Context
51 cancel context.CancelFunc
52
53
54 cfg *writerClientConfig
55
56
57 mu sync.Mutex
58
59 pools map[string]*connectionPool
60 }
61
62
63
64
65
66 func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
67
68 numConns := runtime.GOMAXPROCS(0)
69 if numConns > 4 {
70 numConns = 4
71 }
72 o := []option.ClientOption{
73 option.WithGRPCConnectionPool(numConns),
74 }
75 o = append(o, opts...)
76
77 cCtx, cancel := context.WithCancel(ctx)
78
79 rawClient, err := storage.NewBigQueryWriteClient(cCtx, o...)
80 if err != nil {
81 cancel()
82 return nil, err
83 }
84 rawClient.SetGoogleClientInfo("gccl", internal.Version)
85
86
87 projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
88 if err != nil {
89 cancel()
90 return nil, err
91 }
92
93 return &Client{
94 rawClient: rawClient,
95 projectID: projectID,
96 ctx: cCtx,
97 cancel: cancel,
98 cfg: newWriterClientConfig(opts...),
99 pools: make(map[string]*connectionPool),
100 }, nil
101 }
102
103
104 func (c *Client) Close() error {
105
106
107 c.mu.Lock()
108 defer c.mu.Unlock()
109 var firstErr error
110 for _, pool := range c.pools {
111 if err := pool.Close(); err != nil && firstErr == nil {
112 firstErr = err
113 }
114 }
115
116
117 if err := c.rawClient.Close(); err != nil && firstErr == nil {
118 firstErr = err
119 }
120
121 if c.cancel != nil {
122 c.cancel()
123 }
124 return firstErr
125 }
126
127
128
129
130 func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {
131 return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...)
132 }
133
134
135 func createOpenF(streamFunc streamClientFunc, routingHeader string) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
136 return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
137 if routingHeader != "" {
138 ctx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", routingHeader)
139 }
140 arc, err := streamFunc(ctx, opts...)
141 if err != nil {
142 return nil, err
143 }
144 return arc, nil
145 }
146 }
147
148 func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) {
149
150 writer := &ManagedStream{
151 id: newUUID(writerIDPrefix),
152 c: c,
153 streamSettings: defaultStreamSettings(),
154 curTemplate: newVersionedTemplate(),
155 }
156
157 for _, opt := range opts {
158 opt(writer)
159 }
160
161
162
163 if !skipSetup {
164 if err := c.validateOptions(ctx, writer); err != nil {
165 return nil, err
166 }
167
168 if writer.streamSettings.streamID == "" {
169
170 streamName := fmt.Sprintf("%s/streams/_default", writer.streamSettings.destinationTable)
171 if writer.streamSettings.streamType != DefaultStream {
172
173 req := &storagepb.CreateWriteStreamRequest{
174 Parent: writer.streamSettings.destinationTable,
175 WriteStream: &storagepb.WriteStream{
176 Type: streamTypeToEnum(writer.streamSettings.streamType),
177 }}
178 resp, err := writer.c.rawClient.CreateWriteStream(ctx, req)
179 if err != nil {
180 return nil, fmt.Errorf("couldn't create write stream: %w", err)
181 }
182 streamName = resp.GetName()
183 }
184 writer.streamSettings.streamID = streamName
185 }
186 }
187
188 pool, err := c.resolvePool(ctx, writer.streamSettings, streamFunc)
189 if err != nil {
190 return nil, err
191 }
192
193 if err := pool.addWriter(writer); err != nil {
194 return nil, err
195 }
196 writer.ctx, writer.cancel = context.WithCancel(ctx)
197
198
199 writer.ctx = setupWriterStatContext(writer)
200 return writer, nil
201 }
202
203
204
205 func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
206 if ms == nil {
207 return fmt.Errorf("no managed stream definition")
208 }
209 if ms.streamSettings.streamID != "" {
210
211 info, err := c.getWriteStream(ctx, ms.streamSettings.streamID, false)
212 if err != nil {
213 return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
214 }
215
216 ms.streamSettings.streamType = StreamType(info.Type.String())
217 ms.streamSettings.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
218 }
219 if ms.streamSettings.destinationTable == "" {
220 return fmt.Errorf("no destination table specified")
221 }
222
223 if ms.StreamType() == "" {
224 return fmt.Errorf("stream type wasn't specified")
225 }
226 return nil
227 }
228
229
230 func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
231 c.mu.Lock()
232 defer c.mu.Unlock()
233 resp, err := c.getWriteStream(ctx, settings.streamID, false)
234 if err != nil {
235 return nil, err
236 }
237 loc := resp.GetLocation()
238 if pool, ok := c.pools[loc]; ok {
239 return pool, nil
240 }
241
242
243 pool, err := c.createPool(loc, streamFunc)
244 if err != nil {
245 return nil, err
246 }
247 c.pools[loc] = pool
248 return pool, nil
249 }
250
251
252 func (c *Client) createPool(location string, streamFunc streamClientFunc) (*connectionPool, error) {
253 cCtx, cancel := context.WithCancel(c.ctx)
254
255 if c.cfg == nil {
256 cancel()
257 return nil, fmt.Errorf("missing client config")
258 }
259
260 var routingHeader string
261
267
268 pool := &connectionPool{
269 id: newUUID(poolIDPrefix),
270 location: location,
271 ctx: cCtx,
272 cancel: cancel,
273 open: createOpenF(streamFunc, routingHeader),
274 callOptions: c.cfg.defaultAppendRowsCallOptions,
275 baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes),
276 }
277 router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize)
278 if err := pool.activateRouter(router); err != nil {
279 return nil, err
280 }
281 return pool, nil
282 }
283
284
285
286
287
288
289
290 func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
291 return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...)
292 }
293
294
295
296
297
298
299
300 func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
301 return c.rawClient.CreateWriteStream(ctx, req, opts...)
302 }
303
304
305 func (c *Client) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
306 return c.rawClient.GetWriteStream(ctx, req, opts...)
307 }
308
309
310 func (c *Client) getWriteStream(ctx context.Context, streamName string, fullView bool) (*storagepb.WriteStream, error) {
311 req := &storagepb.GetWriteStreamRequest{
312 Name: streamName,
313 }
314 if fullView {
315 req.View = storagepb.WriteStreamView_FULL
316 }
317 return c.rawClient.GetWriteStream(ctx, req)
318 }
319
320
321
322
323 func TableParentFromStreamName(streamName string) string {
324
325
326 parts := strings.SplitN(streamName, "/", 7)
327 if len(parts) < 7 {
328
329 return streamName
330 }
331 return strings.Join(parts[:6], "/")
332 }
333
334
335
336 func TableParentFromParts(projectID, datasetID, tableID string) string {
337 return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
338 }
339
340
341 func newUUID(prefix string) string {
342 id := uuid.New()
343 return fmt.Sprintf("%s_%s", prefix, id.String())
344 }
345
346
347
348 func canMultiplex(in string) bool {
349
350 return strings.HasSuffix(in, "default")
351 }
352
View as plain text