...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/send_optimizer_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"io"
    20  	"testing"
    21  
    22  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    23  	"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
    24  	"github.com/google/go-cmp/cmp"
    25  	"google.golang.org/protobuf/proto"
    26  	"google.golang.org/protobuf/reflect/protodesc"
    27  	"google.golang.org/protobuf/testing/protocmp"
    28  	"google.golang.org/protobuf/types/descriptorpb"
    29  )
    30  
    31  func TestSendOptimizer(t *testing.T) {
    32  
    33  	exampleReq := &storagepb.AppendRowsRequest{
    34  		Rows: &storagepb.AppendRowsRequest_ProtoRows{
    35  			ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
    36  				Rows: &storagepb.ProtoRows{
    37  					SerializedRows: [][]byte{[]byte("row_data")},
    38  				},
    39  			},
    40  		},
    41  	}
    42  	exampleStreamID := "foo"
    43  	exampleTraceID := "trace_id"
    44  	exampleReqFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
    45  	exampleReqFull.WriteStream = exampleStreamID
    46  	exampleReqFull.TraceId = buildTraceID(&streamSettings{TraceID: exampleTraceID})
    47  	exampleDP := &descriptorpb.DescriptorProto{Name: proto.String("schema")}
    48  	exampleReqFull.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
    49  		ProtoDescriptor: proto.Clone(exampleDP).(*descriptorpb.DescriptorProto),
    50  	}
    51  
    52  	ctx := context.Background()
    53  
    54  	var testCases = []struct {
    55  		description string
    56  		optimizer   sendOptimizer
    57  		reqs        []*pendingWrite
    58  		sendResults []error
    59  		wantReqs    []*storagepb.AppendRowsRequest
    60  	}{
    61  		{
    62  			description: "verbose-optimizer",
    63  			optimizer:   &verboseOptimizer{},
    64  			reqs: func() []*pendingWrite {
    65  				tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
    66  				return []*pendingWrite{
    67  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
    68  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
    69  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
    70  				}
    71  			}(),
    72  			sendResults: []error{
    73  				nil,
    74  				io.EOF,
    75  				io.EOF,
    76  			},
    77  			wantReqs: []*storagepb.AppendRowsRequest{
    78  				proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
    79  				proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
    80  				proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
    81  			},
    82  		},
    83  		{
    84  			description: "simplex no errors",
    85  			optimizer:   &simplexOptimizer{},
    86  			reqs: func() []*pendingWrite {
    87  				tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
    88  				return []*pendingWrite{
    89  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
    90  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
    91  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
    92  				}
    93  			}(),
    94  			sendResults: []error{
    95  				nil,
    96  				nil,
    97  				nil,
    98  			},
    99  			wantReqs: func() []*storagepb.AppendRowsRequest {
   100  				want := make([]*storagepb.AppendRowsRequest, 3)
   101  				// first has no redactions.
   102  				want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
   103  				req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
   104  				req.GetProtoRows().WriterSchema = nil
   105  				req.TraceId = ""
   106  				req.WriteStream = ""
   107  				// second and third are optimized.
   108  				want[1] = req
   109  				want[2] = req
   110  				return want
   111  			}(),
   112  		},
   113  		{
   114  			description: "simplex w/partial errors",
   115  			optimizer:   &simplexOptimizer{},
   116  			reqs: func() []*pendingWrite {
   117  				tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
   118  				return []*pendingWrite{
   119  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   120  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   121  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   122  				}
   123  			}(),
   124  			sendResults: []error{
   125  				nil,
   126  				io.EOF,
   127  				nil,
   128  			},
   129  			wantReqs: func() []*storagepb.AppendRowsRequest {
   130  				want := make([]*storagepb.AppendRowsRequest, 3)
   131  				want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
   132  				req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
   133  				req.GetProtoRows().WriterSchema = nil
   134  				req.TraceId = ""
   135  				req.WriteStream = ""
   136  				// second request is optimized
   137  				want[1] = req
   138  				// error causes third request to be full again.
   139  				want[2] = want[0]
   140  				return want
   141  			}(),
   142  		},
   143  		{
   144  			description: "multiplex single all errors",
   145  			optimizer:   &multiplexOptimizer{},
   146  			reqs: func() []*pendingWrite {
   147  				tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
   148  				return []*pendingWrite{
   149  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   150  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   151  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   152  				}
   153  			}(),
   154  			sendResults: []error{
   155  				io.EOF,
   156  				io.EOF,
   157  				io.EOF,
   158  			},
   159  			wantReqs: []*storagepb.AppendRowsRequest{
   160  				proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
   161  				proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
   162  				proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
   163  			},
   164  		},
   165  		{
   166  			description: "multiplex single no errors",
   167  			optimizer:   &multiplexOptimizer{},
   168  			reqs: func() []*pendingWrite {
   169  				tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
   170  				return []*pendingWrite{
   171  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   172  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   173  					newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
   174  				}
   175  			}(),
   176  			sendResults: []error{
   177  				nil,
   178  				nil,
   179  				nil,
   180  			},
   181  			wantReqs: func() []*storagepb.AppendRowsRequest {
   182  				want := make([]*storagepb.AppendRowsRequest, 3)
   183  				want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
   184  				req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
   185  				req.GetProtoRows().WriterSchema = nil
   186  				req.TraceId = ""
   187  				want[1] = req
   188  				want[2] = req
   189  				return want
   190  			}(),
   191  		},
   192  		{
   193  			description: "multiplex interleave",
   194  			optimizer:   &multiplexOptimizer{},
   195  			reqs: func() []*pendingWrite {
   196  				tmplA := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
   197  				tmplB := newVersionedTemplate().revise(reviseProtoSchema(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())))
   198  
   199  				reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
   200  				reqA.WriteStream = "alpha"
   201  
   202  				reqB := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
   203  				reqB.WriteStream = "beta"
   204  
   205  				writes := make([]*pendingWrite, 10)
   206  				writes[0] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
   207  				writes[1] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
   208  				writes[2] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
   209  				writes[3] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
   210  				writes[4] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
   211  				writes[5] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
   212  				writes[6] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
   213  				writes[7] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
   214  				writes[8] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
   215  				writes[9] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
   216  
   217  				return writes
   218  			}(),
   219  			sendResults: []error{
   220  				nil,
   221  				nil,
   222  				nil,
   223  				nil,
   224  				nil,
   225  				io.EOF,
   226  				nil,
   227  				nil,
   228  				nil,
   229  				io.EOF,
   230  			},
   231  			wantReqs: func() []*storagepb.AppendRowsRequest {
   232  				want := make([]*storagepb.AppendRowsRequest, 10)
   233  
   234  				wantReqAFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
   235  				wantReqAFull.WriteStream = "alpha"
   236  
   237  				wantReqANoTrace := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
   238  				wantReqANoTrace.TraceId = ""
   239  
   240  				wantReqAOpt := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
   241  				wantReqAOpt.GetProtoRows().WriterSchema = nil
   242  				wantReqAOpt.TraceId = ""
   243  
   244  				wantReqBFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
   245  				wantReqBFull.WriteStream = "beta"
   246  				wantReqBFull.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())
   247  
   248  				wantReqBNoTrace := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
   249  				wantReqBNoTrace.TraceId = ""
   250  
   251  				wantReqBOpt := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
   252  				wantReqBOpt.GetProtoRows().WriterSchema = nil
   253  				wantReqBOpt.TraceId = ""
   254  
   255  				want[0] = wantReqAFull
   256  				want[1] = wantReqAOpt
   257  				want[2] = wantReqBNoTrace
   258  				want[3] = wantReqANoTrace
   259  				want[4] = wantReqBNoTrace
   260  				want[5] = wantReqBOpt
   261  				want[6] = wantReqBFull
   262  				want[7] = wantReqBOpt
   263  				want[8] = wantReqANoTrace
   264  				want[9] = wantReqAOpt
   265  
   266  				return want
   267  			}(),
   268  		},
   269  		{
   270  			description: "multiplex w/evolution",
   271  			optimizer:   &multiplexOptimizer{},
   272  			reqs: func() []*pendingWrite {
   273  				tmplOld := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
   274  				tmplNew := tmplOld.revise(reviseProtoSchema(&descriptorpb.DescriptorProto{Name: proto.String("new")}))
   275  
   276  				example := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
   277  
   278  				writes := make([]*pendingWrite, 4)
   279  				writes[0] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID)
   280  				writes[1] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID)
   281  				writes[2] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID)
   282  				writes[3] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID)
   283  
   284  				return writes
   285  			}(),
   286  			sendResults: []error{
   287  				nil,
   288  				nil,
   289  				nil,
   290  				nil,
   291  			},
   292  			wantReqs: func() []*storagepb.AppendRowsRequest {
   293  				want := make([]*storagepb.AppendRowsRequest, 4)
   294  
   295  				wantBaseReqFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
   296  
   297  				wantBaseReqOpt := proto.Clone(wantBaseReqFull).(*storagepb.AppendRowsRequest)
   298  				wantBaseReqOpt.TraceId = ""
   299  				wantBaseReqOpt.GetProtoRows().WriterSchema = nil
   300  
   301  				wantEvolved := proto.Clone(wantBaseReqOpt).(*storagepb.AppendRowsRequest)
   302  				wantEvolved.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
   303  					ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("new")},
   304  				}
   305  
   306  				want[0] = wantBaseReqFull
   307  				want[1] = wantBaseReqOpt
   308  				want[2] = wantEvolved
   309  				want[3] = wantBaseReqOpt
   310  				return want
   311  			}(),
   312  		},
   313  	}
   314  
   315  	for _, tc := range testCases {
   316  		testARC := &testAppendRowsClient{}
   317  		testARC.sendF = func(req *storagepb.AppendRowsRequest) error {
   318  			testARC.requests = append(testARC.requests, proto.Clone(req).(*storagepb.AppendRowsRequest))
   319  			respErr := tc.sendResults[0]
   320  			tc.sendResults = tc.sendResults[1:]
   321  			return respErr
   322  		}
   323  
   324  		for _, req := range tc.reqs {
   325  			err := tc.optimizer.optimizeSend(testARC, req)
   326  			if err != nil {
   327  				tc.optimizer.signalReset()
   328  			}
   329  		}
   330  		// now, compare.
   331  		for k, wr := range tc.wantReqs {
   332  			if diff := cmp.Diff(testARC.requests[k], wr, protocmp.Transform()); diff != "" {
   333  				t.Errorf("%s (req %d) mismatch: -got, +want:\n%s", tc.description, k, diff)
   334  			}
   335  		}
   336  	}
   337  }
   338  
   339  func TestVersionedTemplate(t *testing.T) {
   340  	testCases := []struct {
   341  		desc           string
   342  		inputTmpl      *storagepb.AppendRowsRequest
   343  		changes        []templateRevisionF
   344  		wantCompatible bool
   345  	}{
   346  		{
   347  			desc:           "nil template",
   348  			wantCompatible: true,
   349  		},
   350  		{
   351  			desc:           "no changes",
   352  			inputTmpl:      &storagepb.AppendRowsRequest{},
   353  			wantCompatible: true,
   354  		},
   355  		{
   356  			desc:      "empty schema",
   357  			inputTmpl: &storagepb.AppendRowsRequest{},
   358  			changes: []templateRevisionF{
   359  				reviseProtoSchema(nil),
   360  			},
   361  			wantCompatible: false,
   362  		},
   363  		{
   364  			desc: "same default mvi",
   365  			inputTmpl: &storagepb.AppendRowsRequest{
   366  				DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE,
   367  			},
   368  			changes: []templateRevisionF{
   369  				reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE),
   370  			},
   371  			wantCompatible: true,
   372  		},
   373  		{
   374  			desc: "differing default mvi",
   375  			inputTmpl: &storagepb.AppendRowsRequest{
   376  				DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE,
   377  			},
   378  			changes: []templateRevisionF{
   379  				reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE),
   380  			},
   381  			wantCompatible: false,
   382  		},
   383  	}
   384  
   385  	for _, tc := range testCases {
   386  		orig := newVersionedTemplate()
   387  		orig.tmpl = tc.inputTmpl
   388  		orig.computeHash()
   389  
   390  		rev := orig.revise(tc.changes...)
   391  		if orig.Compatible(rev) != rev.Compatible(orig) {
   392  			t.Errorf("case %q: inconsistent compatibility, orig %t rev %t", tc.desc, orig.Compatible(rev), rev.Compatible(orig))
   393  		}
   394  		if got := orig.Compatible(rev); tc.wantCompatible != got {
   395  			t.Errorf("case %q: Compatible mismatch, got %t want %t", tc.desc, got, tc.wantCompatible)
   396  		}
   397  	}
   398  }
   399  

View as plain text