...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"io"
    21  	"sync"
    22  	"time"
    23  
    24  	"cloud.google.com/go/bigquery/internal"
    25  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    26  	"github.com/googleapis/gax-go/v2"
    27  	"go.opencensus.io/tag"
    28  	"google.golang.org/grpc"
    29  	grpcstatus "google.golang.org/grpc/status"
    30  	"google.golang.org/protobuf/types/known/wrapperspb"
    31  )
    32  
    33  // StreamType indicates the type of stream this write client is managing.
    34  type StreamType string
    35  
    36  var (
    37  	// DefaultStream most closely mimics the legacy bigquery
    38  	// tabledata.insertAll semantics.  Successful inserts are
    39  	// committed immediately, and there's no tracking offsets as
    40  	// all writes go into a "default" stream that always exists
    41  	// for a table.
    42  	DefaultStream StreamType = "DEFAULT"
    43  
    44  	// CommittedStream appends data immediately, but creates a
    45  	// discrete stream for the work so that offset tracking can
    46  	// be used to track writes.
    47  	CommittedStream StreamType = "COMMITTED"
    48  
    49  	// BufferedStream is a form of checkpointed stream, that allows
    50  	// you to advance the offset of visible rows via Flush operations.
    51  	BufferedStream StreamType = "BUFFERED"
    52  
    53  	// PendingStream is a stream in which no data is made visible to
    54  	// readers until the stream is finalized and committed explicitly.
    55  	PendingStream StreamType = "PENDING"
    56  )
    57  
    58  func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
    59  	switch t {
    60  	case CommittedStream:
    61  		return storagepb.WriteStream_COMMITTED
    62  	case PendingStream:
    63  		return storagepb.WriteStream_PENDING
    64  	case BufferedStream:
    65  		return storagepb.WriteStream_BUFFERED
    66  	default:
    67  		return storagepb.WriteStream_TYPE_UNSPECIFIED
    68  	}
    69  }
    70  
    71  // ManagedStream is the abstraction over a single write stream.
    72  type ManagedStream struct {
    73  	// Unique id for the managedstream instance.
    74  	id string
    75  
    76  	// pool retains a reference to the writer's pool.  A writer is only associated to a single pool.
    77  	pool *connectionPool
    78  
    79  	streamSettings *streamSettings
    80  	// retains the current descriptor for the stream.
    81  	curTemplate *versionedTemplate
    82  	c           *Client
    83  	retry       *statelessRetryer
    84  
    85  	// writer state
    86  	mu     sync.Mutex
    87  	ctx    context.Context // used for stats/instrumentation, and to check the writer is live.
    88  	cancel context.CancelFunc
    89  	err    error // retains any terminal error (writer was closed)
    90  }
    91  
    92  // streamSettings is for capturing configuration and option information.
    93  type streamSettings struct {
    94  
    95  	// streamID contains the reference to the destination stream.
    96  	streamID string
    97  
    98  	// streamType governs behavior of the client, such as how
    99  	// offset handling is managed.
   100  	streamType StreamType
   101  
   102  	// MaxInflightRequests governs how many unacknowledged
   103  	// append writes can be outstanding into the system.
   104  	MaxInflightRequests int
   105  
   106  	// MaxInflightBytes governs how many unacknowledged
   107  	// request bytes can be outstanding into the system.
   108  	MaxInflightBytes int
   109  
   110  	// TraceID can be set when appending data on a stream. It's
   111  	// purpose is to aid in debug and diagnostic scenarios.
   112  	TraceID string
   113  
   114  	// dataOrigin can be set for classifying metrics generated
   115  	// by a stream.
   116  	dataOrigin string
   117  
   118  	// retains reference to the target table when resolving settings
   119  	destinationTable string
   120  
   121  	appendCallOptions []gax.CallOption
   122  
   123  	// enable multiplex?
   124  	multiplex bool
   125  
   126  	// retain a copy of the stream client func.
   127  	streamFunc streamClientFunc
   128  }
   129  
   130  func defaultStreamSettings() *streamSettings {
   131  	return &streamSettings{
   132  		streamType:          DefaultStream,
   133  		MaxInflightRequests: 1000,
   134  		MaxInflightBytes:    0,
   135  		appendCallOptions: []gax.CallOption{
   136  			gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
   137  		},
   138  	}
   139  }
   140  
   141  // buildTraceID handles prefixing of a user-supplied trace ID with a client identifier.
   142  func buildTraceID(s *streamSettings) string {
   143  	base := fmt.Sprintf("go-managedwriter:%s", internal.Version)
   144  	if s != nil && s.TraceID != "" {
   145  		return fmt.Sprintf("%s %s", base, s.TraceID)
   146  	}
   147  	return base
   148  }
   149  
   150  // StreamName returns the corresponding write stream ID being managed by this writer.
   151  func (ms *ManagedStream) StreamName() string {
   152  	return ms.streamSettings.streamID
   153  }
   154  
   155  // StreamType returns the configured type for this stream.
   156  func (ms *ManagedStream) StreamType() StreamType {
   157  	return ms.streamSettings.streamType
   158  }
   159  
   160  // FlushRows advances the offset at which rows in a BufferedStream are visible.  Calling
   161  // this method for other stream types yields an error.
   162  func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) {
   163  	req := &storagepb.FlushRowsRequest{
   164  		WriteStream: ms.streamSettings.streamID,
   165  		Offset: &wrapperspb.Int64Value{
   166  			Value: offset,
   167  		},
   168  	}
   169  	resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
   170  	recordWriterStat(ms, FlushRequests, 1)
   171  	if err != nil {
   172  		return 0, err
   173  	}
   174  	return resp.GetOffset(), nil
   175  }
   176  
   177  // Finalize is used to mark a stream as complete, and thus ensure no further data can
   178  // be appended to the stream.  You cannot finalize a DefaultStream, as it always exists.
   179  //
   180  // Finalizing does not advance the current offset of a BufferedStream, nor does it commit
   181  // data in a PendingStream.
   182  func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) {
   183  	// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
   184  	req := &storagepb.FinalizeWriteStreamRequest{
   185  		Name: ms.streamSettings.streamID,
   186  	}
   187  	resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...)
   188  	if err != nil {
   189  		return 0, err
   190  	}
   191  	return resp.GetRowCount(), nil
   192  }
   193  
   194  // appendWithRetry handles the details of adding sending an append request on a stream.  Appends are sent on a long
   195  // lived bidirectional network stream, with it's own managed context (ms.ctx), and there's a per-request context
   196  // attached to the pendingWrite.
   197  func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error {
   198  	for {
   199  		ms.mu.Lock()
   200  		err := ms.err
   201  		ms.mu.Unlock()
   202  		if err != nil {
   203  			return err
   204  		}
   205  		conn, err := ms.pool.selectConn(pw)
   206  		if err != nil {
   207  			pw.markDone(nil, err)
   208  			return err
   209  		}
   210  		appendErr := conn.lockingAppend(pw)
   211  		if appendErr != nil {
   212  			// Append yielded an error.  Retry by continuing or return.
   213  			status := grpcstatus.Convert(appendErr)
   214  			if status != nil {
   215  				recordCtx := ms.ctx
   216  				if ctx, err := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())); err == nil {
   217  					recordCtx = ctx
   218  				}
   219  				recordStat(recordCtx, AppendRequestErrors, 1)
   220  			}
   221  			bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount)
   222  			if shouldRetry {
   223  				if err := gax.Sleep(ms.ctx, bo); err != nil {
   224  					return err
   225  				}
   226  				continue
   227  			}
   228  			// This append cannot be retried locally.  It is not the responsibility of this function to finalize the pending
   229  			// write however, as that's handled by callers.
   230  			// Related: https://github.com/googleapis/google-cloud-go/issues/7380
   231  			return appendErr
   232  		}
   233  		return nil
   234  	}
   235  }
   236  
   237  // Close closes a managed stream.
   238  func (ms *ManagedStream) Close() error {
   239  
   240  	ms.mu.Lock()
   241  	defer ms.mu.Unlock()
   242  
   243  	var returned error
   244  
   245  	if ms.pool != nil {
   246  		if err := ms.pool.removeWriter(ms); err != nil {
   247  			returned = err
   248  		}
   249  	}
   250  
   251  	// Cancel the underlying context for the stream, we don't allow re-open.
   252  	if ms.cancel != nil {
   253  		ms.cancel()
   254  		ms.cancel = nil
   255  	}
   256  
   257  	// For normal operation, mark the stream error as io.EOF.
   258  	if ms.err == nil {
   259  		ms.err = io.EOF
   260  	}
   261  	if returned == nil {
   262  		returned = ms.err
   263  	}
   264  	return returned
   265  }
   266  
   267  // buildRequest constructs an optimized AppendRowsRequest.
   268  // Offset (if specified) is applied later.
   269  func (ms *ManagedStream) buildRequest(data [][]byte) *storagepb.AppendRowsRequest {
   270  	return &storagepb.AppendRowsRequest{
   271  		Rows: &storagepb.AppendRowsRequest_ProtoRows{
   272  			ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   273  				Rows: &storagepb.ProtoRows{
   274  					SerializedRows: data,
   275  				},
   276  			},
   277  		},
   278  	}
   279  }
   280  
   281  // AppendRows sends the append requests to the service, and returns a single AppendResult for tracking
   282  // the set of data.
   283  //
   284  // The format of the row data is binary serialized protocol buffer bytes.  The message must be compatible
   285  // with the schema currently set for the stream.
   286  //
   287  // Use the WithOffset() AppendOption to set an explicit offset for this append.  Setting an offset for
   288  // a default stream is unsupported.
   289  //
   290  // The size of a single request must be less than 10 MB in size.
   291  // Requests larger than this return an error, typically `INVALID_ARGUMENT`.
   292  func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) {
   293  	// before we do anything, ensure the writer isn't closed.
   294  	ms.mu.Lock()
   295  	err := ms.err
   296  	ms.mu.Unlock()
   297  	if err != nil {
   298  		return nil, err
   299  	}
   300  	// Ensure we build the request and pending write with a consistent schema version.
   301  	curTemplate := ms.curTemplate
   302  	req := ms.buildRequest(data)
   303  	pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID)
   304  	// apply AppendOption opts
   305  	for _, opt := range opts {
   306  		opt(pw)
   307  	}
   308  	// Post-request fixup after options are applied.
   309  	if pw.reqTmpl != nil {
   310  		if pw.reqTmpl.tmpl != nil {
   311  			// MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime.  Sigh.
   312  			pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations()
   313  		}
   314  	}
   315  
   316  	// Call the underlying append.  The stream has it's own retained context and will surface expiry on
   317  	// it's own, but we also need to respect any deadline for the provided context.
   318  	errCh := make(chan error)
   319  	var appendErr error
   320  	go func() {
   321  		select {
   322  		case errCh <- ms.appendWithRetry(pw):
   323  		case <-ctx.Done():
   324  		case <-ms.ctx.Done():
   325  		}
   326  		close(errCh)
   327  	}()
   328  	select {
   329  	case <-ctx.Done():
   330  		// It is incorrect to simply mark the request done, as it's potentially in flight in the bidi stream
   331  		// where we can't propagate a cancellation.  Our options are to return the pending write even though
   332  		// it's in an ambiguous state, or to return the error and simply drop the pending write on the floor.
   333  		//
   334  		// This API expresses request idempotency through offset management, so users who care to use offsets
   335  		// can deal with the dropped request.
   336  		return nil, ctx.Err()
   337  	case <-ms.ctx.Done():
   338  		// Same as the request context being done, this indicates the writer context expired.  For this case,
   339  		// we also attempt to close the writer.
   340  		ms.mu.Lock()
   341  		if ms.err == nil {
   342  			ms.err = ms.ctx.Err()
   343  		}
   344  		ms.mu.Unlock()
   345  		ms.Close()
   346  		// Don't relock to fetch the writer terminal error, as we've already ensured that the writer is closed.
   347  		return nil, ms.err
   348  	case appendErr = <-errCh:
   349  		if appendErr != nil {
   350  			return nil, appendErr
   351  		}
   352  		return pw.result, nil
   353  	}
   354  }
   355  
   356  // processRetry is responsible for evaluating and re-enqueing an append.
   357  // If the append is not retried, it is marked complete.
   358  func (ms *ManagedStream) processRetry(pw *pendingWrite, srcConn *connection, appendResp *storagepb.AppendRowsResponse, initialErr error) {
   359  	err := initialErr
   360  	for {
   361  		pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount)
   362  		if !shouldRetry {
   363  			// Should not attempt to re-append.
   364  			pw.markDone(appendResp, err)
   365  			return
   366  		}
   367  		time.Sleep(pause)
   368  		err = ms.appendWithRetry(pw)
   369  		if err != nil {
   370  			// Re-enqueue failed, send it through the loop again.
   371  			continue
   372  		}
   373  		// Break out of the loop, we were successful and the write has been
   374  		// re-inserted.
   375  		recordWriterStat(ms, AppendRetryCount, 1)
   376  		break
   377  	}
   378  }
   379  
   380  // returns the stateless retryer.  If one's not set (re-enqueue retries disabled),
   381  // it returns a retryer that only permits single attempts.
   382  func (ms *ManagedStream) statelessRetryer() *statelessRetryer {
   383  	if ms.retry != nil {
   384  		return ms.retry
   385  	}
   386  	if ms.pool != nil {
   387  		return ms.pool.defaultRetryer()
   388  	}
   389  	return &statelessRetryer{
   390  		maxAttempts: 1,
   391  	}
   392  }
   393  

View as plain text