...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/appendresult.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  
    20  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    21  	"github.com/googleapis/gax-go/v2/apierror"
    22  	grpcstatus "google.golang.org/grpc/status"
    23  	"google.golang.org/protobuf/proto"
    24  )
    25  
    26  // NoStreamOffset is a sentinel value for signalling we're not tracking
    27  // stream offset (e.g. a default stream which allows simultaneous append streams).
    28  const NoStreamOffset int64 = -1
    29  
    30  // AppendResult tracks the status of a batch of data rows.
    31  type AppendResult struct {
    32  	ready chan struct{}
    33  
    34  	// if the append failed without a response, this will retain a reference to the error.
    35  	err error
    36  
    37  	// retains the original response.
    38  	response *storagepb.AppendRowsResponse
    39  
    40  	// retains the number of times this individual write was enqueued.
    41  	totalAttempts int
    42  }
    43  
    44  func newAppendResult() *AppendResult {
    45  	return &AppendResult{
    46  		ready: make(chan struct{}),
    47  	}
    48  }
    49  
    50  // Ready blocks until the append request has reached a completed state,
    51  // which may be a successful append or an error.
    52  func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready }
    53  
    54  // GetResult returns the optional offset of this row, as well as any error encountered while
    55  // processing the append.
    56  //
    57  // This call blocks until the result is ready, or context is no longer valid.
    58  func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
    59  	select {
    60  	case <-ctx.Done():
    61  		return NoStreamOffset, ctx.Err()
    62  	case <-ar.Ready():
    63  		full, err := ar.FullResponse(ctx)
    64  		offset := NoStreamOffset
    65  		if full != nil {
    66  			if result := full.GetAppendResult(); result != nil {
    67  				if off := result.GetOffset(); off != nil {
    68  					offset = off.GetValue()
    69  				}
    70  			}
    71  		}
    72  		return offset, err
    73  	}
    74  }
    75  
    76  // FullResponse returns the full content of the AppendRowsResponse, and any error encountered while
    77  // processing the append.
    78  //
    79  // The AppendRowResponse may contain an embedded error.  An embedded error in the response will be
    80  // converted and returned as the error response, so this method may return both the
    81  // AppendRowsResponse and an error.
    82  //
    83  // This call blocks until the result is ready, or context is no longer valid.
    84  func (ar *AppendResult) FullResponse(ctx context.Context) (*storagepb.AppendRowsResponse, error) {
    85  	select {
    86  	case <-ctx.Done():
    87  		return nil, ctx.Err()
    88  	case <-ar.Ready():
    89  		var err error
    90  		if ar.err != nil {
    91  			err = ar.err
    92  		} else {
    93  			if ar.response != nil {
    94  				if status := ar.response.GetError(); status != nil {
    95  					statusErr := grpcstatus.ErrorProto(status)
    96  					// Provide an APIError if possible.
    97  					if apiErr, ok := apierror.FromError(statusErr); ok {
    98  						err = apiErr
    99  					} else {
   100  						err = statusErr
   101  					}
   102  				}
   103  			}
   104  		}
   105  		if ar.response != nil {
   106  			return proto.Clone(ar.response).(*storagepb.AppendRowsResponse), err
   107  		}
   108  		return nil, err
   109  	}
   110  }
   111  
   112  func (ar *AppendResult) offset(ctx context.Context) int64 {
   113  	select {
   114  	case <-ctx.Done():
   115  		return NoStreamOffset
   116  	case <-ar.Ready():
   117  		if ar.response != nil {
   118  			if result := ar.response.GetAppendResult(); result != nil {
   119  				if off := result.GetOffset(); off != nil {
   120  					return off.GetValue()
   121  				}
   122  			}
   123  		}
   124  		return NoStreamOffset
   125  	}
   126  }
   127  
   128  // UpdatedSchema returns the updated schema for a table if supplied by the backend as part
   129  // of the append response.
   130  //
   131  // This call blocks until the result is ready, or context is no longer valid.
   132  func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error) {
   133  	select {
   134  	case <-ctx.Done():
   135  		return nil, ctx.Err()
   136  	case <-ar.Ready():
   137  		if ar.response != nil {
   138  			if schema := ar.response.GetUpdatedSchema(); schema != nil {
   139  				return proto.Clone(schema).(*storagepb.TableSchema), nil
   140  			}
   141  		}
   142  		return nil, nil
   143  	}
   144  }
   145  
   146  // TotalAttempts returns the number of times this write was attempted.
   147  //
   148  // This call blocks until the result is ready, or context is no longer valid.
   149  func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) {
   150  	select {
   151  	case <-ctx.Done():
   152  		return 0, ctx.Err()
   153  	case <-ar.Ready():
   154  		return ar.totalAttempts, nil
   155  	}
   156  }
   157  
   158  // pendingWrite tracks state for a set of rows that are part of a single
   159  // append request.
   160  type pendingWrite struct {
   161  	// writer retains a reference to the origin of a pending write.  Primary
   162  	// used is to inform routing decisions.
   163  	writer *ManagedStream
   164  
   165  	// We store the request as it's simplex-optimized form, as statistically that's the most
   166  	// likely outcome when processing requests and it allows us to be efficient on send.
   167  	// We retain the additional information to build the complete request in the related fields.
   168  	req           *storagepb.AppendRowsRequest
   169  	reqTmpl       *versionedTemplate // request template at time of creation
   170  	traceID       string
   171  	writeStreamID string
   172  
   173  	// Reference to the AppendResult which is exposed to the user.
   174  	result *AppendResult
   175  
   176  	// Flow control is based on the unoptimized request size.
   177  	reqSize int
   178  
   179  	// retains the original request context, primarily for checking against
   180  	// cancellation signals.
   181  	reqCtx context.Context
   182  
   183  	// tracks the number of times we've attempted this append request.
   184  	attemptCount int
   185  }
   186  
   187  // newPendingWrite constructs the proto request and attaches references
   188  // to the pending results for later consumption.  The provided context is
   189  // embedded in the pending write, as the write may be retried and we want
   190  // to respect the original context for expiry/cancellation etc.
   191  func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite {
   192  	pw := &pendingWrite{
   193  		writer: src,
   194  		result: newAppendResult(),
   195  		reqCtx: ctx,
   196  
   197  		req:           req,     // minimal req, typically just row data
   198  		reqTmpl:       reqTmpl, // remainder of templated request
   199  		writeStreamID: writeStreamID,
   200  		traceID:       traceID,
   201  	}
   202  	// Compute the approx size for flow control purposes.
   203  	pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID)
   204  	if pw.reqTmpl != nil {
   205  		pw.reqSize += proto.Size(pw.reqTmpl.tmpl)
   206  	}
   207  	return pw
   208  }
   209  
   210  // markDone propagates finalization of an append request to the associated
   211  // AppendResult.
   212  func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error) {
   213  	// First, propagate necessary state from the pendingWrite to the final result.
   214  	if resp != nil {
   215  		pw.result.response = resp
   216  	}
   217  	pw.result.err = err
   218  	pw.result.totalAttempts = pw.attemptCount
   219  
   220  	// Close the result's ready channel.
   221  	close(pw.result.ready)
   222  	// Cleanup references remaining on the write explicitly.
   223  	pw.req = nil
   224  	pw.reqTmpl = nil
   225  	pw.writer = nil
   226  	pw.reqCtx = nil
   227  }
   228  
   229  func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest {
   230  	req := &storagepb.AppendRowsRequest{}
   231  	if pw.reqTmpl != nil {
   232  		req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest)
   233  	}
   234  	if pw.req != nil {
   235  		proto.Merge(req, pw.req)
   236  	}
   237  	if addTrace {
   238  		req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID})
   239  	}
   240  	req.WriteStream = pw.writeStreamID
   241  	return req
   242  }
   243  

View as plain text