1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "sync"
23
24 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
25 "github.com/googleapis/gax-go/v2"
26 "go.opencensus.io/tag"
27 "google.golang.org/grpc/codes"
28 grpcstatus "google.golang.org/grpc/status"
29 )
30
31 const (
32 poolIDPrefix string = "connectionpool"
33 connIDPrefix string = "connection"
34 writerIDPrefix string = "writer"
35 )
36
37 var (
38 errNoRouterForPool = errors.New("no router for connection pool")
39 )
40
41
42
43
44
45 type connectionPool struct {
46 id string
47 location string
48
49
50 ctx context.Context
51 cancel context.CancelFunc
52
53 baseFlowController *flowController
54
55
56
57 open func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
58
59
60
61 callOptions []gax.CallOption
62
63 router poolRouter
64
65 retry *statelessRetryer
66 }
67
68
69 func (pool *connectionPool) activateRouter(rtr poolRouter) error {
70 if pool.router != nil {
71 return fmt.Errorf("router already activated")
72 }
73 if err := rtr.poolAttach(pool); err != nil {
74 return fmt.Errorf("router rejected attach: %w", err)
75 }
76 pool.router = rtr
77 return nil
78 }
79
80 func (pool *connectionPool) Close() error {
81
82 var err error
83 if pool.router != nil {
84 err = pool.router.poolDetach()
85 }
86 if cancel := pool.cancel; cancel != nil {
87 cancel()
88 }
89 return err
90 }
91
92
93 func (pool *connectionPool) selectConn(pw *pendingWrite) (*connection, error) {
94 if pool.router == nil {
95 return nil, errNoRouterForPool
96 }
97 return pool.router.pickConnection(pw)
98 }
99
100 func (pool *connectionPool) addWriter(writer *ManagedStream) error {
101 if p := writer.pool; p != nil {
102 return fmt.Errorf("writer already attached to pool %q", p.id)
103 }
104 if pool.router == nil {
105 return errNoRouterForPool
106 }
107 if err := pool.router.writerAttach(writer); err != nil {
108 return err
109 }
110 writer.pool = pool
111 return nil
112 }
113
114 func (pool *connectionPool) removeWriter(writer *ManagedStream) error {
115 if pool.router == nil {
116 return errNoRouterForPool
117 }
118 detachErr := pool.router.writerDetach(writer)
119 return detachErr
120 }
121
122 func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption {
123 if co == nil {
124 return cp.callOptions
125 }
126 var mergedOpts []gax.CallOption
127 mergedOpts = append(mergedOpts, cp.callOptions...)
128 mergedOpts = append(mergedOpts, co.callOptions...)
129 return mergedOpts
130 }
131
132
133
134
135
136 func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
137 r := &unaryRetryer{}
138 for {
139 arc, err := cp.open(co.ctx, cp.mergeCallOptions(co)...)
140 metricCtx := cp.ctx
141 if err == nil {
142
143 recordStat(metricCtx, AppendClientOpenCount, 1)
144 }
145 if err != nil {
146 if tagCtx, tagErr := tag.New(cp.ctx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
147 metricCtx = tagCtx
148 }
149
150 recordStat(metricCtx, AppendClientOpenCount, 1)
151 bo, shouldRetry := r.Retry(err)
152 if shouldRetry {
153 recordStat(cp.ctx, AppendClientOpenRetryCount, 1)
154 if err := gax.Sleep(cp.ctx, bo); err != nil {
155 return nil, nil, err
156 }
157 continue
158 } else {
159
160 return nil, nil, err
161 }
162 }
163
164
165
166
167 depth := 1000
168 if d := co.fc.maxInsertCount; d > 0 {
169 depth = d
170 }
171 ch := make(chan *pendingWrite, depth)
172 go connRecvProcessor(co.ctx, co, arc, ch)
173 return arc, ch, nil
174 }
175 }
176
177
178
179 func (cp *connectionPool) defaultRetryer() *statelessRetryer {
180 if cp.retry != nil {
181 return cp.retry
182 }
183 return &statelessRetryer{
184 maxAttempts: 1,
185 }
186 }
187
188
189
190
191 type connection struct {
192 id string
193 pool *connectionPool
194
195 fc *flowController
196 callOptions []gax.CallOption
197 ctx context.Context
198 cancel context.CancelFunc
199
200 retry *statelessRetryer
201 optimizer sendOptimizer
202
203 mu sync.Mutex
204 arc *storagepb.BigQueryWrite_AppendRowsClient
205 reconnect bool
206 err error
207 pending chan *pendingWrite
208
209 loadBytesThreshold int
210 loadCountThreshold int
211 }
212
213 type connectionMode string
214
215 const (
216 multiplexConnectionMode connectionMode = "MULTIPLEX"
217 simplexConnectionMode connectionMode = "SIMPLEX"
218 verboseConnectionMode connectionMode = "VERBOSE"
219 )
220
221 func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection {
222 if pool == nil {
223 return nil
224 }
225
226 connCtx, cancel := context.WithCancel(pool.ctx)
227
228
229 fcRequests := 0
230 fcBytes := 0
231 var opts []gax.CallOption
232
233 if pool.baseFlowController != nil {
234 fcRequests = pool.baseFlowController.maxInsertCount
235 fcBytes = pool.baseFlowController.maxInsertBytes
236 }
237 if settings != nil {
238 if settings.MaxInflightRequests > 0 {
239 fcRequests = settings.MaxInflightRequests
240 }
241 if settings.MaxInflightBytes > 0 {
242 fcBytes = settings.MaxInflightBytes
243 }
244 opts = settings.appendCallOptions
245 }
246 fc := newFlowController(fcRequests, fcBytes)
247 countLimit, byteLimit := computeLoadThresholds(fc)
248
249 return &connection{
250 id: newUUID(connIDPrefix),
251 pool: pool,
252 fc: fc,
253 ctx: connCtx,
254 cancel: cancel,
255 optimizer: optimizer(mode),
256 loadBytesThreshold: byteLimit,
257 loadCountThreshold: countLimit,
258 callOptions: opts,
259 }
260 }
261
262 func computeLoadThresholds(fc *flowController) (countLimit, byteLimit int) {
263 countLimit = 1000
264 byteLimit = 0
265 if fc != nil {
266 if fc.maxInsertBytes > 0 {
267
268 byteLimit = int(float64(fc.maxInsertBytes) * 0.2)
269 }
270 if fc.maxInsertCount > 0 {
271
272 countLimit = int(float64(fc.maxInsertCount) * 0.2)
273 if countLimit < 1 {
274 countLimit = 1
275 }
276 }
277 }
278 return
279 }
280
281 func optimizer(mode connectionMode) sendOptimizer {
282 switch mode {
283 case multiplexConnectionMode:
284 return &multiplexOptimizer{}
285 case verboseConnectionMode:
286 return &verboseOptimizer{}
287 case simplexConnectionMode:
288 return &simplexOptimizer{}
289 }
290 return nil
291 }
292
293
294 func (co *connection) release(pw *pendingWrite) {
295 co.fc.release(pw.reqSize)
296 }
297
298
299 func (co *connection) isLoaded() bool {
300 if co.loadCountThreshold > 0 && co.fc.count() > co.loadCountThreshold {
301 return true
302 }
303 if co.loadBytesThreshold > 0 && co.fc.bytes() > co.loadBytesThreshold {
304 return true
305 }
306 return false
307 }
308
309
310
311 func (co *connection) curLoad() float64 {
312 load := float64(co.fc.count()) / float64(co.loadCountThreshold+1)
313 if co.fc.maxInsertBytes > 0 {
314 load += (float64(co.fc.bytes()) / float64(co.loadBytesThreshold+1))
315 load = load / 2
316 }
317 return load
318 }
319
320
321 func (co *connection) close() {
322 co.mu.Lock()
323 defer co.mu.Unlock()
324
325 if co.cancel != nil {
326 co.cancel()
327 co.cancel = nil
328 }
329
330 if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) {
331 (*co.arc).CloseSend()
332 co.arc = nil
333 }
334
335 if co.err != nil {
336 co.err = io.EOF
337 }
338
339 if co.pending != nil {
340 close(co.pending)
341 }
342 }
343
344
345 func (co *connection) lockingAppend(pw *pendingWrite) error {
346
347 if err := pw.reqCtx.Err(); err != nil {
348 return err
349 }
350
351 if err := co.fc.acquire(pw.reqCtx, pw.reqSize); err != nil {
352
353 return err
354 }
355
356 var statsOnExit func(ctx context.Context)
357
358
359
360
361
362
363 co.mu.Lock()
364 defer func() {
365 sCtx := co.ctx
366 co.mu.Unlock()
367 if statsOnExit != nil && sCtx != nil {
368 statsOnExit(sCtx)
369 }
370 }()
371
372 var arc *storagepb.BigQueryWrite_AppendRowsClient
373 var ch chan *pendingWrite
374 var err error
375
376
377
378
379 forceReconnect := false
380 promoted := false
381 if pw.writer != nil && pw.reqTmpl != nil {
382 if !pw.reqTmpl.Compatible(pw.writer.curTemplate) {
383 if pw.writer.curTemplate == nil {
384
385 pw.writer.curTemplate = pw.reqTmpl
386 promoted = true
387 } else {
388 if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) {
389 pw.writer.curTemplate = pw.reqTmpl
390 promoted = true
391 }
392 }
393 }
394 }
395 if promoted {
396 if co.optimizer == nil {
397 forceReconnect = true
398 } else {
399 if !co.optimizer.isMultiplexing() {
400 forceReconnect = true
401 }
402 }
403 }
404
405 arc, ch, err = co.getStream(arc, forceReconnect)
406 if err != nil {
407 return err
408 }
409
410 pw.attemptCount = pw.attemptCount + 1
411 if co.optimizer != nil {
412 err = co.optimizer.optimizeSend((*arc), pw)
413 if err != nil {
414
415 co.optimizer.signalReset()
416 }
417 } else {
418
419 err = (*arc).Send(pw.constructFullRequest(true))
420 }
421 if err != nil {
422
423 co.fc.release(pw.reqSize)
424 if shouldReconnect(err) {
425 metricCtx := co.ctx
426 if pw.writer != nil {
427 metricCtx = pw.writer.ctx
428 }
429 if tagCtx, tagErr := tag.New(metricCtx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
430 metricCtx = tagCtx
431 }
432 recordStat(metricCtx, AppendRequestReconnects, 1)
433
434 co.reconnect = true
435 }
436 return err
437 }
438
439
440
441 var numRows int64
442 if r := pw.req.GetProtoRows(); r != nil {
443 if pr := r.GetRows(); pr != nil {
444 numRows = int64(len(pr.GetSerializedRows()))
445 }
446 }
447 statsOnExit = func(ctx context.Context) {
448
449
450 recordStat(ctx, AppendRequestRows, numRows)
451 recordStat(ctx, AppendRequests, 1)
452 recordStat(ctx, AppendRequestBytes, int64(pw.reqSize))
453 }
454 ch <- pw
455 return nil
456 }
457
458
459
460
461 func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
462 if co.err != nil {
463 return nil, nil, co.err
464 }
465 co.err = co.ctx.Err()
466 if co.err != nil {
467 return nil, nil, co.err
468 }
469
470
471 if co.reconnect {
472 forceReconnect = true
473 co.reconnect = false
474 }
475
476 if arc != co.arc && !forceReconnect {
477 return co.arc, co.pending, nil
478 }
479
480 if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) {
481 (*co.arc).CloseSend()
482 }
483 if co.pending != nil {
484 close(co.pending)
485 }
486 if co.cancel != nil {
487 co.cancel()
488 co.ctx, co.cancel = context.WithCancel(co.pool.ctx)
489 }
490
491 co.arc = new(storagepb.BigQueryWrite_AppendRowsClient)
492
493 if co.optimizer != nil {
494 co.optimizer.signalReset()
495 }
496 *co.arc, co.pending, co.err = co.pool.openWithRetry(co)
497 return co.arc, co.pending, co.err
498 }
499
500
501 type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
502
503 var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled")
504
505
506
507
508 func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
509 for {
510 select {
511 case <-ctx.Done():
512
513
514 doneErr := ctx.Err()
515 if doneErr == context.Canceled {
516
517
518
519
520
521
522 doneErr = errConnectionCanceled
523 }
524 for {
525 pw, ok := <-ch
526 if !ok {
527 return
528 }
529
530 co.release(pw)
531
532
533
534
535 pw.writer.processRetry(pw, co, nil, doneErr)
536 }
537 case nextWrite, ok := <-ch:
538 if !ok {
539
540 return
541 }
542
543 resp, err := arc.Recv()
544 co.release(nextWrite)
545 if err != nil {
546
547
548 status := grpcstatus.Convert(err)
549 metricCtx := ctx
550 if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.Code()).String())); tagErr == nil {
551 metricCtx = tagCtx
552 }
553 recordStat(metricCtx, AppendResponseErrors, 1)
554
555 nextWrite.writer.processRetry(nextWrite, co, nil, err)
556 continue
557 }
558
559 recordStat(ctx, AppendResponses, 1)
560
561 if status := resp.GetError(); status != nil {
562
563
564 metricCtx := ctx
565 if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
566 metricCtx = tagCtx
567 }
568 recordStat(metricCtx, AppendResponseErrors, 1)
569 respErr := grpcstatus.ErrorProto(status)
570
571 nextWrite.writer.processRetry(nextWrite, co, resp, respErr)
572
573 continue
574 }
575
576 nextWrite.markDone(resp, nil)
577 }
578 }
579 }
580
View as plain text