...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/send_optimizer.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"bytes"
    19  	"encoding/binary"
    20  	"hash/crc32"
    21  	"time"
    22  
    23  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    24  	"google.golang.org/protobuf/proto"
    25  	"google.golang.org/protobuf/types/descriptorpb"
    26  )
    27  
    28  // sendOptimizer handles the general task of optimizing AppendRowsRequest messages send to the backend.
    29  //
    30  // The general premise is that the ordering of AppendRowsRequests on a connection provides some opportunities
    31  // to reduce payload size, thus potentially increasing throughput.  Care must be taken, however, as deep inspection
    32  // of requests is potentially more costly (in terms of CPU usage) than gains from reducing request sizes.
    33  type sendOptimizer interface {
    34  	// signalReset is used to signal to the optimizer that the connection is freshly (re)opened, or that a previous
    35  	// send yielded an error.
    36  	signalReset()
    37  
    38  	// optimizeSend handles possible manipulation of a request, and triggers the send.
    39  	optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error
    40  
    41  	// isMultiplexing tracks if we've actually sent writes to more than a single stream on this connection.
    42  	isMultiplexing() bool
    43  }
    44  
    45  // verboseOptimizer is a primarily a testing optimizer that always sends the full request.
    46  type verboseOptimizer struct {
    47  }
    48  
    49  func (vo *verboseOptimizer) signalReset() {
    50  	// This optimizer is stateless.
    51  }
    52  
    53  // optimizeSend populates a full request every time.
    54  func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
    55  	return arc.Send(pw.constructFullRequest(true))
    56  }
    57  
    58  func (vo *verboseOptimizer) isMultiplexing() bool {
    59  	// we declare this no to ensure we always reconnect on schema changes.
    60  	return false
    61  }
    62  
    63  // simplexOptimizer is used for connections bearing AppendRowsRequest for only a single stream.
    64  //
    65  // The optimizations here are straightforward:
    66  // * The first request on a connection is unmodified.
    67  // * Subsequent requests can redact WriteStream, WriterSchema, and TraceID.
    68  //
    69  // Behavior of schema evolution differs based on the type of stream.
    70  // * For an explicit stream, the connection must reconnect to signal schema change (handled in connection).
    71  // * For default streams, the new descriptor (inside WriterSchema) can simply be sent.
    72  type simplexOptimizer struct {
    73  	haveSent bool
    74  }
    75  
    76  func (so *simplexOptimizer) signalReset() {
    77  	so.haveSent = false
    78  }
    79  
    80  func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
    81  	var err error
    82  	if so.haveSent {
    83  		// subsequent send, we can send the request unmodified.
    84  		err = arc.Send(pw.req)
    85  	} else {
    86  		// first request, build a full request.
    87  		err = arc.Send(pw.constructFullRequest(true))
    88  	}
    89  	so.haveSent = err == nil
    90  	return err
    91  }
    92  
    93  func (so *simplexOptimizer) isMultiplexing() bool {
    94  	// A simplex optimizer is not designed for multiplexing.
    95  	return false
    96  }
    97  
    98  // multiplexOptimizer is used for connections where requests for multiple default streams are sent on a common
    99  // connection.  Only default streams can currently be multiplexed.
   100  //
   101  // In this case, the optimizations are as follows:
   102  // * We must send the WriteStream on all requests.
   103  // * For sequential requests to the same stream, schema can be redacted after the first request.
   104  // * Trace ID can be redacted from all requests after the first.
   105  //
   106  // Schema evolution is simply a case of sending the new WriterSchema as part of the request(s).  No explicit
   107  // reconnection is necessary.
   108  type multiplexOptimizer struct {
   109  	prevStream       string
   110  	prevTemplate     *versionedTemplate
   111  	multiplexStreams bool
   112  }
   113  
   114  func (mo *multiplexOptimizer) signalReset() {
   115  	mo.prevStream = ""
   116  	mo.multiplexStreams = false
   117  	mo.prevTemplate = nil
   118  }
   119  
   120  func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
   121  	var err error
   122  	if mo.prevStream == "" {
   123  		// startup case, send a full request (with traceID).
   124  		req := pw.constructFullRequest(true)
   125  		err = arc.Send(req)
   126  		if err == nil {
   127  			mo.prevStream = req.GetWriteStream()
   128  			mo.prevTemplate = pw.reqTmpl
   129  		}
   130  	} else {
   131  		// We have a previous send.  Determine if it's the same stream or a different one.
   132  		if mo.prevStream == pw.writeStreamID {
   133  			// add the stream ID to the optimized request, as multiplex-optimization wants it present.
   134  			if pw.req.GetWriteStream() == "" {
   135  				pw.req.WriteStream = pw.writeStreamID
   136  			}
   137  			// swapOnSuccess tracks if we need to update schema versions on successful send.
   138  			swapOnSuccess := false
   139  			req := pw.req
   140  			if mo.prevTemplate != nil {
   141  				if !mo.prevTemplate.Compatible(pw.reqTmpl) {
   142  					swapOnSuccess = true
   143  					req = pw.constructFullRequest(false) // full request minus traceID.
   144  				}
   145  			}
   146  			err = arc.Send(req)
   147  			if err == nil && swapOnSuccess {
   148  				mo.prevTemplate = pw.reqTmpl
   149  			}
   150  		} else {
   151  			// The previous send was for a different stream.  Send a full request, minus traceId.
   152  			req := pw.constructFullRequest(false)
   153  			err = arc.Send(req)
   154  			if err == nil {
   155  				// Send successful.  Update state to reflect this send is now the "previous" state.
   156  				mo.prevStream = pw.writeStreamID
   157  				mo.prevTemplate = pw.reqTmpl
   158  			}
   159  			// Also, note that we've sent traffic for multiple streams, which means the backend recognizes this
   160  			// is a multiplex stream as well.
   161  			mo.multiplexStreams = true
   162  		}
   163  	}
   164  	return err
   165  }
   166  
   167  func (mo *multiplexOptimizer) isMultiplexing() bool {
   168  	return mo.multiplexStreams
   169  }
   170  
   171  // versionedTemplate is used for faster comparison of the templated part of
   172  // an AppendRowsRequest, which bears settings-like fields related to schema
   173  // and default value configuration.  Direct proto comparison through something
   174  // like proto.Equal is far too expensive, so versionTemplate leverages a faster
   175  // hash-based comparison to avoid the deep equality checks.
   176  type versionedTemplate struct {
   177  	versionTime time.Time
   178  	hashVal     uint32
   179  	tmpl        *storagepb.AppendRowsRequest
   180  }
   181  
   182  func newVersionedTemplate() *versionedTemplate {
   183  	vt := &versionedTemplate{
   184  		versionTime: time.Now(),
   185  		tmpl:        &storagepb.AppendRowsRequest{},
   186  	}
   187  	vt.computeHash()
   188  	return vt
   189  }
   190  
   191  // computeHash is an internal utility function for calculating the hash value
   192  // for faster comparison.
   193  func (vt *versionedTemplate) computeHash() {
   194  	buf := new(bytes.Buffer)
   195  	if b, err := proto.Marshal(vt.tmpl); err == nil {
   196  		buf.Write(b)
   197  	} else {
   198  		// if we fail to serialize the proto (unlikely), consume the timestamp for input instead.
   199  		binary.Write(buf, binary.LittleEndian, vt.versionTime.UnixNano())
   200  	}
   201  	vt.hashVal = crc32.ChecksumIEEE(buf.Bytes())
   202  }
   203  
   204  type templateRevisionF func(m *storagepb.AppendRowsRequest)
   205  
   206  // revise makes a new versionedTemplate from the existing template, applying any changes.
   207  // The original revision is returned if there's no effective difference after changes are
   208  // applied.
   209  func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate {
   210  	before := vt
   211  	if before == nil {
   212  		before = newVersionedTemplate()
   213  	}
   214  	if len(changes) == 0 {
   215  		// if there's no changes, return the base revision immediately.
   216  		return before
   217  	}
   218  	out := &versionedTemplate{
   219  		versionTime: time.Now(),
   220  		tmpl:        proto.Clone(before.tmpl).(*storagepb.AppendRowsRequest),
   221  	}
   222  	for _, r := range changes {
   223  		r(out.tmpl)
   224  	}
   225  	out.computeHash()
   226  	if out.Compatible(before) {
   227  		// The changes didn't yield an measured difference.  Return the base revision to avoid
   228  		// possible connection churn from no-op revisions.
   229  		return before
   230  	}
   231  	return out
   232  }
   233  
   234  // Compatible is effectively a fast equality check, that relies on the hash value
   235  // and avoids the potentially very costly deep comparison of the proto message templates.
   236  func (vt *versionedTemplate) Compatible(other *versionedTemplate) bool {
   237  	if other == nil {
   238  		return vt == nil
   239  	}
   240  	return vt.hashVal == other.hashVal
   241  }
   242  
   243  func reviseProtoSchema(newSchema *descriptorpb.DescriptorProto) templateRevisionF {
   244  	return func(m *storagepb.AppendRowsRequest) {
   245  		if m != nil {
   246  			m.Rows = &storagepb.AppendRowsRequest_ProtoRows{
   247  				ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   248  					WriterSchema: &storagepb.ProtoSchema{
   249  						ProtoDescriptor: proto.Clone(newSchema).(*descriptorpb.DescriptorProto),
   250  					},
   251  				},
   252  			}
   253  		}
   254  	}
   255  }
   256  
   257  func reviseMissingValueInterpretations(vi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
   258  	return func(m *storagepb.AppendRowsRequest) {
   259  		if m != nil {
   260  			m.MissingValueInterpretations = vi
   261  		}
   262  	}
   263  }
   264  
   265  func reviseDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
   266  	return func(m *storagepb.AppendRowsRequest) {
   267  		if m != nil {
   268  			m.DefaultMissingValueInterpretation = def
   269  		}
   270  	}
   271  }
   272  

View as plain text