...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/connection.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"sync"
    23  
    24  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    25  	"github.com/googleapis/gax-go/v2"
    26  	"go.opencensus.io/tag"
    27  	"google.golang.org/grpc/codes"
    28  	grpcstatus "google.golang.org/grpc/status"
    29  )
    30  
    31  const (
    32  	poolIDPrefix   string = "connectionpool"
    33  	connIDPrefix   string = "connection"
    34  	writerIDPrefix string = "writer"
    35  )
    36  
    37  var (
    38  	errNoRouterForPool = errors.New("no router for connection pool")
    39  )
    40  
    41  // connectionPool represents a pooled set of connections.
    42  //
    43  // The pool retains references to connections, and maintains the mapping between writers
    44  // and connections.
    45  type connectionPool struct {
    46  	id       string
    47  	location string // BQ region associated with this pool.
    48  
    49  	// the pool retains the long-lived context responsible for opening/maintaining bidi connections.
    50  	ctx    context.Context
    51  	cancel context.CancelFunc
    52  
    53  	baseFlowController *flowController // template flow controller used for building connections.
    54  
    55  	// We centralize the open function on the pool, rather than having an instance of the open func on every
    56  	// connection.  Opening the connection is a stateless operation.
    57  	open func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
    58  
    59  	// We specify default calloptions for the pool.
    60  	// Explicit connections may have their own calloptions as well.
    61  	callOptions []gax.CallOption
    62  
    63  	router poolRouter // poolManager makes the decisions about connections and routing.
    64  
    65  	retry *statelessRetryer // default retryer for the pool.
    66  }
    67  
    68  // activateRouter handles wiring up a connection pool and it's router.
    69  func (pool *connectionPool) activateRouter(rtr poolRouter) error {
    70  	if pool.router != nil {
    71  		return fmt.Errorf("router already activated")
    72  	}
    73  	if err := rtr.poolAttach(pool); err != nil {
    74  		return fmt.Errorf("router rejected attach: %w", err)
    75  	}
    76  	pool.router = rtr
    77  	return nil
    78  }
    79  
    80  func (pool *connectionPool) Close() error {
    81  	// Signal router and cancel context, which should propagate to all writers.
    82  	var err error
    83  	if pool.router != nil {
    84  		err = pool.router.poolDetach()
    85  	}
    86  	if cancel := pool.cancel; cancel != nil {
    87  		cancel()
    88  	}
    89  	return err
    90  }
    91  
    92  // pickConnection is used by writers to select a connection.
    93  func (pool *connectionPool) selectConn(pw *pendingWrite) (*connection, error) {
    94  	if pool.router == nil {
    95  		return nil, errNoRouterForPool
    96  	}
    97  	return pool.router.pickConnection(pw)
    98  }
    99  
   100  func (pool *connectionPool) addWriter(writer *ManagedStream) error {
   101  	if p := writer.pool; p != nil {
   102  		return fmt.Errorf("writer already attached to pool %q", p.id)
   103  	}
   104  	if pool.router == nil {
   105  		return errNoRouterForPool
   106  	}
   107  	if err := pool.router.writerAttach(writer); err != nil {
   108  		return err
   109  	}
   110  	writer.pool = pool
   111  	return nil
   112  }
   113  
   114  func (pool *connectionPool) removeWriter(writer *ManagedStream) error {
   115  	if pool.router == nil {
   116  		return errNoRouterForPool
   117  	}
   118  	detachErr := pool.router.writerDetach(writer)
   119  	return detachErr
   120  }
   121  
   122  func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption {
   123  	if co == nil {
   124  		return cp.callOptions
   125  	}
   126  	var mergedOpts []gax.CallOption
   127  	mergedOpts = append(mergedOpts, cp.callOptions...)
   128  	mergedOpts = append(mergedOpts, co.callOptions...)
   129  	return mergedOpts
   130  }
   131  
   132  // openWithRetry establishes a new bidi stream and channel pair.  It is used by connection objects
   133  // when (re)opening the network connection to the backend.
   134  //
   135  // The connection.getStream() func should be the only consumer of this.
   136  func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
   137  	r := &unaryRetryer{}
   138  	for {
   139  		arc, err := cp.open(co.ctx, cp.mergeCallOptions(co)...)
   140  		metricCtx := cp.ctx
   141  		if err == nil {
   142  			// accumulate AppendClientOpenCount for the success case.
   143  			recordStat(metricCtx, AppendClientOpenCount, 1)
   144  		}
   145  		if err != nil {
   146  			if tagCtx, tagErr := tag.New(cp.ctx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
   147  				metricCtx = tagCtx
   148  			}
   149  			// accumulate AppendClientOpenCount for the error case.
   150  			recordStat(metricCtx, AppendClientOpenCount, 1)
   151  			bo, shouldRetry := r.Retry(err)
   152  			if shouldRetry {
   153  				recordStat(cp.ctx, AppendClientOpenRetryCount, 1)
   154  				if err := gax.Sleep(cp.ctx, bo); err != nil {
   155  					return nil, nil, err
   156  				}
   157  				continue
   158  			} else {
   159  				// non-retriable error while opening
   160  				return nil, nil, err
   161  			}
   162  		}
   163  
   164  		// The channel relationship with its ARC is 1:1.  If we get a new ARC, create a new pending
   165  		// write channel and fire up the associated receive processor.  The channel ensures that
   166  		// responses for a connection are processed in the same order that appends were sent.
   167  		depth := 1000 // default backend queue limit
   168  		if d := co.fc.maxInsertCount; d > 0 {
   169  			depth = d
   170  		}
   171  		ch := make(chan *pendingWrite, depth)
   172  		go connRecvProcessor(co.ctx, co, arc, ch)
   173  		return arc, ch, nil
   174  	}
   175  }
   176  
   177  // returns the stateless default retryer for the pool.  If one's not set (re-enqueue retries disabled),
   178  // it returns a retryer that only permits single attempts.
   179  func (cp *connectionPool) defaultRetryer() *statelessRetryer {
   180  	if cp.retry != nil {
   181  		return cp.retry
   182  	}
   183  	return &statelessRetryer{
   184  		maxAttempts: 1,
   185  	}
   186  }
   187  
   188  // connection models the underlying AppendRows grpc bidi connection used for writing
   189  // data and receiving acknowledgements.  It is responsible for enqueing writes and processing
   190  // responses from the backend.
   191  type connection struct {
   192  	id   string
   193  	pool *connectionPool // each connection retains a reference to its owning pool.
   194  
   195  	fc          *flowController  // each connection has it's own flow controller.
   196  	callOptions []gax.CallOption // custom calloptions for this connection.
   197  	ctx         context.Context  // retained context for maintaining the connection, derived from the owning pool.
   198  	cancel      context.CancelFunc
   199  
   200  	retry     *statelessRetryer
   201  	optimizer sendOptimizer
   202  
   203  	mu        sync.Mutex
   204  	arc       *storagepb.BigQueryWrite_AppendRowsClient // reference to the grpc connection (send, recv, close)
   205  	reconnect bool                                      //
   206  	err       error                                     // terminal connection error
   207  	pending   chan *pendingWrite
   208  
   209  	loadBytesThreshold int
   210  	loadCountThreshold int
   211  }
   212  
   213  type connectionMode string
   214  
   215  const (
   216  	multiplexConnectionMode connectionMode = "MULTIPLEX"
   217  	simplexConnectionMode   connectionMode = "SIMPLEX"
   218  	verboseConnectionMode   connectionMode = "VERBOSE"
   219  )
   220  
   221  func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection {
   222  	if pool == nil {
   223  		return nil
   224  	}
   225  	// create and retain a cancellable context.
   226  	connCtx, cancel := context.WithCancel(pool.ctx)
   227  
   228  	// Resolve local overrides for flow control and call options
   229  	fcRequests := 0
   230  	fcBytes := 0
   231  	var opts []gax.CallOption
   232  
   233  	if pool.baseFlowController != nil {
   234  		fcRequests = pool.baseFlowController.maxInsertCount
   235  		fcBytes = pool.baseFlowController.maxInsertBytes
   236  	}
   237  	if settings != nil {
   238  		if settings.MaxInflightRequests > 0 {
   239  			fcRequests = settings.MaxInflightRequests
   240  		}
   241  		if settings.MaxInflightBytes > 0 {
   242  			fcBytes = settings.MaxInflightBytes
   243  		}
   244  		opts = settings.appendCallOptions
   245  	}
   246  	fc := newFlowController(fcRequests, fcBytes)
   247  	countLimit, byteLimit := computeLoadThresholds(fc)
   248  
   249  	return &connection{
   250  		id:                 newUUID(connIDPrefix),
   251  		pool:               pool,
   252  		fc:                 fc,
   253  		ctx:                connCtx,
   254  		cancel:             cancel,
   255  		optimizer:          optimizer(mode),
   256  		loadBytesThreshold: byteLimit,
   257  		loadCountThreshold: countLimit,
   258  		callOptions:        opts,
   259  	}
   260  }
   261  
   262  func computeLoadThresholds(fc *flowController) (countLimit, byteLimit int) {
   263  	countLimit = 1000
   264  	byteLimit = 0
   265  	if fc != nil {
   266  		if fc.maxInsertBytes > 0 {
   267  			// 20% of byte limit
   268  			byteLimit = int(float64(fc.maxInsertBytes) * 0.2)
   269  		}
   270  		if fc.maxInsertCount > 0 {
   271  			// MIN(1, 20% of insert limit)
   272  			countLimit = int(float64(fc.maxInsertCount) * 0.2)
   273  			if countLimit < 1 {
   274  				countLimit = 1
   275  			}
   276  		}
   277  	}
   278  	return
   279  }
   280  
   281  func optimizer(mode connectionMode) sendOptimizer {
   282  	switch mode {
   283  	case multiplexConnectionMode:
   284  		return &multiplexOptimizer{}
   285  	case verboseConnectionMode:
   286  		return &verboseOptimizer{}
   287  	case simplexConnectionMode:
   288  		return &simplexOptimizer{}
   289  	}
   290  	return nil
   291  }
   292  
   293  // release is used to signal flow control release when a write is no longer in flight.
   294  func (co *connection) release(pw *pendingWrite) {
   295  	co.fc.release(pw.reqSize)
   296  }
   297  
   298  // signal indicating that multiplex traffic level is high enough to warrant adding more connections.
   299  func (co *connection) isLoaded() bool {
   300  	if co.loadCountThreshold > 0 && co.fc.count() > co.loadCountThreshold {
   301  		return true
   302  	}
   303  	if co.loadBytesThreshold > 0 && co.fc.bytes() > co.loadBytesThreshold {
   304  		return true
   305  	}
   306  	return false
   307  }
   308  
   309  // curLoad is a representation of connection load.
   310  // Its primary purpose is comparing the load of different connections.
   311  func (co *connection) curLoad() float64 {
   312  	load := float64(co.fc.count()) / float64(co.loadCountThreshold+1)
   313  	if co.fc.maxInsertBytes > 0 {
   314  		load += (float64(co.fc.bytes()) / float64(co.loadBytesThreshold+1))
   315  		load = load / 2
   316  	}
   317  	return load
   318  }
   319  
   320  // close closes a connection.
   321  func (co *connection) close() {
   322  	co.mu.Lock()
   323  	defer co.mu.Unlock()
   324  	// first, cancel the retained context.
   325  	if co.cancel != nil {
   326  		co.cancel()
   327  		co.cancel = nil
   328  	}
   329  	// close sending if we have a real ARC.
   330  	if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) {
   331  		(*co.arc).CloseSend()
   332  		co.arc = nil
   333  	}
   334  	// mark terminal error if not already set.
   335  	if co.err != nil {
   336  		co.err = io.EOF
   337  	}
   338  	// signal pending channel close.
   339  	if co.pending != nil {
   340  		close(co.pending)
   341  	}
   342  }
   343  
   344  // lockingAppend handles a single append request on a given connection.
   345  func (co *connection) lockingAppend(pw *pendingWrite) error {
   346  	// Don't both calling/retrying if this append's context is already expired.
   347  	if err := pw.reqCtx.Err(); err != nil {
   348  		return err
   349  	}
   350  
   351  	if err := co.fc.acquire(pw.reqCtx, pw.reqSize); err != nil {
   352  		// We've failed to acquire.  This may get retried on a different connection, so marking the write done is incorrect.
   353  		return err
   354  	}
   355  
   356  	var statsOnExit func(ctx context.Context)
   357  
   358  	// critical section:  Things that need to happen inside the critical section:
   359  	//
   360  	// * get/open conenction
   361  	// * issue the append
   362  	// * add the pending write to the channel for the connection (ordering for the response)
   363  	co.mu.Lock()
   364  	defer func() {
   365  		sCtx := co.ctx
   366  		co.mu.Unlock()
   367  		if statsOnExit != nil && sCtx != nil {
   368  			statsOnExit(sCtx)
   369  		}
   370  	}()
   371  
   372  	var arc *storagepb.BigQueryWrite_AppendRowsClient
   373  	var ch chan *pendingWrite
   374  	var err error
   375  
   376  	// Handle promotion of per-request schema to default schema in the case of updates.
   377  	// Additionally, we check multiplex status as schema changes for explicit streams
   378  	// require reconnect, whereas multiplex does not.
   379  	forceReconnect := false
   380  	promoted := false
   381  	if pw.writer != nil && pw.reqTmpl != nil {
   382  		if !pw.reqTmpl.Compatible(pw.writer.curTemplate) {
   383  			if pw.writer.curTemplate == nil {
   384  				// promote because there's no current template
   385  				pw.writer.curTemplate = pw.reqTmpl
   386  				promoted = true
   387  			} else {
   388  				if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) {
   389  					pw.writer.curTemplate = pw.reqTmpl
   390  					promoted = true
   391  				}
   392  			}
   393  		}
   394  	}
   395  	if promoted {
   396  		if co.optimizer == nil {
   397  			forceReconnect = true
   398  		} else {
   399  			if !co.optimizer.isMultiplexing() {
   400  				forceReconnect = true
   401  			}
   402  		}
   403  	}
   404  
   405  	arc, ch, err = co.getStream(arc, forceReconnect)
   406  	if err != nil {
   407  		return err
   408  	}
   409  
   410  	pw.attemptCount = pw.attemptCount + 1
   411  	if co.optimizer != nil {
   412  		err = co.optimizer.optimizeSend((*arc), pw)
   413  		if err != nil {
   414  			// Reset optimizer state on error.
   415  			co.optimizer.signalReset()
   416  		}
   417  	} else {
   418  		// No optimizer present, send a fully populated request.
   419  		err = (*arc).Send(pw.constructFullRequest(true))
   420  	}
   421  	if err != nil {
   422  		// Refund the flow controller immediately, as there's nothing to refund on the receiver.
   423  		co.fc.release(pw.reqSize)
   424  		if shouldReconnect(err) {
   425  			metricCtx := co.ctx // start with the ctx that must be present
   426  			if pw.writer != nil {
   427  				metricCtx = pw.writer.ctx // the writer ctx bears the stream/origin tagging, so prefer it.
   428  			}
   429  			if tagCtx, tagErr := tag.New(metricCtx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
   430  				metricCtx = tagCtx
   431  			}
   432  			recordStat(metricCtx, AppendRequestReconnects, 1)
   433  			// if we think this connection is unhealthy, force a reconnect on the next send.
   434  			co.reconnect = true
   435  		}
   436  		return err
   437  	}
   438  
   439  	// Compute numRows, once we pass ownership to the channel the request may be
   440  	// cleared.
   441  	var numRows int64
   442  	if r := pw.req.GetProtoRows(); r != nil {
   443  		if pr := r.GetRows(); pr != nil {
   444  			numRows = int64(len(pr.GetSerializedRows()))
   445  		}
   446  	}
   447  	statsOnExit = func(ctx context.Context) {
   448  		// these will get recorded once we exit the critical section.
   449  		// TODO: resolve open questions around what labels should be attached (connection, streamID, etc)
   450  		recordStat(ctx, AppendRequestRows, numRows)
   451  		recordStat(ctx, AppendRequests, 1)
   452  		recordStat(ctx, AppendRequestBytes, int64(pw.reqSize))
   453  	}
   454  	ch <- pw
   455  	return nil
   456  }
   457  
   458  // getStream returns either a valid ARC client stream or permanent error.
   459  //
   460  // Any calls to getStream should do so in possesion of the critical section lock.
   461  func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
   462  	if co.err != nil {
   463  		return nil, nil, co.err
   464  	}
   465  	co.err = co.ctx.Err()
   466  	if co.err != nil {
   467  		return nil, nil, co.err
   468  	}
   469  
   470  	// Previous activity on the stream indicated it is not healthy, so propagate that as a reconnect.
   471  	if co.reconnect {
   472  		forceReconnect = true
   473  		co.reconnect = false
   474  	}
   475  	// Always return the retained ARC if the arg differs.
   476  	if arc != co.arc && !forceReconnect {
   477  		return co.arc, co.pending, nil
   478  	}
   479  	// We need to (re)open a connection.  Cleanup previous connection, channel, and context if they are present.
   480  	if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) {
   481  		(*co.arc).CloseSend()
   482  	}
   483  	if co.pending != nil {
   484  		close(co.pending)
   485  	}
   486  	if co.cancel != nil {
   487  		co.cancel()
   488  		co.ctx, co.cancel = context.WithCancel(co.pool.ctx)
   489  	}
   490  
   491  	co.arc = new(storagepb.BigQueryWrite_AppendRowsClient)
   492  	// We're going to (re)open the connection, so clear any optimizer state.
   493  	if co.optimizer != nil {
   494  		co.optimizer.signalReset()
   495  	}
   496  	*co.arc, co.pending, co.err = co.pool.openWithRetry(co)
   497  	return co.arc, co.pending, co.err
   498  }
   499  
   500  // enables testing
   501  type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
   502  
   503  var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled")
   504  
   505  // connRecvProcessor is used to propagate append responses back up with the originating write requests.  It
   506  // It runs as a goroutine.  A connection object allows for reconnection, and each reconnection establishes a new
   507  // context, processing goroutine and backing channel.
   508  func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
   509  	for {
   510  		select {
   511  		case <-ctx.Done():
   512  			// Channel context is done, which means we're not getting further updates on in flight appends and should
   513  			// process everything left in the existing channel/connection.
   514  			doneErr := ctx.Err()
   515  			if doneErr == context.Canceled {
   516  				// This is a special case.  Connection recovery ends up cancelling a context as part of a reconnection, and with
   517  				// request retrying enabled we can possibly re-enqueue writes.  To allow graceful retry for this behavior, we
   518  				// we translate this to an rpc status error to avoid doing things like introducing context errors as part of the retry predicate.
   519  				//
   520  				// The tradeoff here is that write retries may roundtrip multiple times for something like a pool shutdown, even though the final
   521  				// outcome would result in an error.
   522  				doneErr = errConnectionCanceled
   523  			}
   524  			for {
   525  				pw, ok := <-ch
   526  				if !ok {
   527  					return
   528  				}
   529  				// This connection will not recover, but still attempt to keep flow controller state consistent.
   530  				co.release(pw)
   531  
   532  				// TODO:  Determine if/how we should report this case, as we have no viable context for propagating.
   533  
   534  				// Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue.
   535  				pw.writer.processRetry(pw, co, nil, doneErr)
   536  			}
   537  		case nextWrite, ok := <-ch:
   538  			if !ok {
   539  				// Channel closed, all elements processed.
   540  				return
   541  			}
   542  			// block until we get a corresponding response or err from stream.
   543  			resp, err := arc.Recv()
   544  			co.release(nextWrite)
   545  			if err != nil {
   546  				// The Recv() itself yielded an error.  We increment AppendResponseErrors by one, tagged by the status
   547  				// code.
   548  				status := grpcstatus.Convert(err)
   549  				metricCtx := ctx
   550  				if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.Code()).String())); tagErr == nil {
   551  					metricCtx = tagCtx
   552  				}
   553  				recordStat(metricCtx, AppendResponseErrors, 1)
   554  
   555  				nextWrite.writer.processRetry(nextWrite, co, nil, err)
   556  				continue
   557  			}
   558  			// Record that we did in fact get a response from the backend.
   559  			recordStat(ctx, AppendResponses, 1)
   560  
   561  			if status := resp.GetError(); status != nil {
   562  				// The response was received successfully, but the response embeds a status error in the payload.
   563  				// Increment AppendResponseErrors, tagged by status code.
   564  				metricCtx := ctx
   565  				if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
   566  					metricCtx = tagCtx
   567  				}
   568  				recordStat(metricCtx, AppendResponseErrors, 1)
   569  				respErr := grpcstatus.ErrorProto(status)
   570  
   571  				nextWrite.writer.processRetry(nextWrite, co, resp, respErr)
   572  
   573  				continue
   574  			}
   575  			// We had no error in the receive or in the response.  Mark the write done.
   576  			nextWrite.markDone(resp, nil)
   577  		}
   578  	}
   579  }
   580  

View as plain text