// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "context" "io" "testing" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "cloud.google.com/go/bigquery/storage/managedwriter/testdata" "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protodesc" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/descriptorpb" ) func TestSendOptimizer(t *testing.T) { exampleReq := &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ Rows: &storagepb.ProtoRows{ SerializedRows: [][]byte{[]byte("row_data")}, }, }, }, } exampleStreamID := "foo" exampleTraceID := "trace_id" exampleReqFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) exampleReqFull.WriteStream = exampleStreamID exampleReqFull.TraceId = buildTraceID(&streamSettings{TraceID: exampleTraceID}) exampleDP := &descriptorpb.DescriptorProto{Name: proto.String("schema")} exampleReqFull.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ ProtoDescriptor: proto.Clone(exampleDP).(*descriptorpb.DescriptorProto), } ctx := context.Background() var testCases = []struct { description string optimizer sendOptimizer reqs []*pendingWrite sendResults []error wantReqs []*storagepb.AppendRowsRequest }{ { description: "verbose-optimizer", optimizer: &verboseOptimizer{}, reqs: func() []*pendingWrite { tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ nil, io.EOF, io.EOF, }, wantReqs: []*storagepb.AppendRowsRequest{ proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest), proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest), proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest), }, }, { description: "simplex no errors", optimizer: &simplexOptimizer{}, reqs: func() []*pendingWrite { tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ nil, nil, nil, }, wantReqs: func() []*storagepb.AppendRowsRequest { want := make([]*storagepb.AppendRowsRequest, 3) // first has no redactions. want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest) req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest) req.GetProtoRows().WriterSchema = nil req.TraceId = "" req.WriteStream = "" // second and third are optimized. want[1] = req want[2] = req return want }(), }, { description: "simplex w/partial errors", optimizer: &simplexOptimizer{}, reqs: func() []*pendingWrite { tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ nil, io.EOF, nil, }, wantReqs: func() []*storagepb.AppendRowsRequest { want := make([]*storagepb.AppendRowsRequest, 3) want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest) req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest) req.GetProtoRows().WriterSchema = nil req.TraceId = "" req.WriteStream = "" // second request is optimized want[1] = req // error causes third request to be full again. want[2] = want[0] return want }(), }, { description: "multiplex single all errors", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ io.EOF, io.EOF, io.EOF, }, wantReqs: []*storagepb.AppendRowsRequest{ proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest), proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest), proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest), }, }, { description: "multiplex single no errors", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ nil, nil, nil, }, wantReqs: func() []*storagepb.AppendRowsRequest { want := make([]*storagepb.AppendRowsRequest, 3) want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest) req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest) req.GetProtoRows().WriterSchema = nil req.TraceId = "" want[1] = req want[2] = req return want }(), }, { description: "multiplex interleave", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { tmplA := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) tmplB := newVersionedTemplate().revise(reviseProtoSchema(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor()))) reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) reqA.WriteStream = "alpha" reqB := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) reqB.WriteStream = "beta" writes := make([]*pendingWrite, 10) writes[0] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) writes[1] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) writes[2] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) writes[3] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) writes[4] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) writes[5] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) writes[6] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) writes[7] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) writes[8] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) writes[9] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) return writes }(), sendResults: []error{ nil, nil, nil, nil, nil, io.EOF, nil, nil, nil, io.EOF, }, wantReqs: func() []*storagepb.AppendRowsRequest { want := make([]*storagepb.AppendRowsRequest, 10) wantReqAFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest) wantReqAFull.WriteStream = "alpha" wantReqANoTrace := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest) wantReqANoTrace.TraceId = "" wantReqAOpt := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest) wantReqAOpt.GetProtoRows().WriterSchema = nil wantReqAOpt.TraceId = "" wantReqBFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest) wantReqBFull.WriteStream = "beta" wantReqBFull.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor()) wantReqBNoTrace := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest) wantReqBNoTrace.TraceId = "" wantReqBOpt := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest) wantReqBOpt.GetProtoRows().WriterSchema = nil wantReqBOpt.TraceId = "" want[0] = wantReqAFull want[1] = wantReqAOpt want[2] = wantReqBNoTrace want[3] = wantReqANoTrace want[4] = wantReqBNoTrace want[5] = wantReqBOpt want[6] = wantReqBFull want[7] = wantReqBOpt want[8] = wantReqANoTrace want[9] = wantReqAOpt return want }(), }, { description: "multiplex w/evolution", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { tmplOld := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) tmplNew := tmplOld.revise(reviseProtoSchema(&descriptorpb.DescriptorProto{Name: proto.String("new")})) example := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) writes := make([]*pendingWrite, 4) writes[0] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID) writes[1] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID) writes[2] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID) writes[3] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID) return writes }(), sendResults: []error{ nil, nil, nil, nil, }, wantReqs: func() []*storagepb.AppendRowsRequest { want := make([]*storagepb.AppendRowsRequest, 4) wantBaseReqFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest) wantBaseReqOpt := proto.Clone(wantBaseReqFull).(*storagepb.AppendRowsRequest) wantBaseReqOpt.TraceId = "" wantBaseReqOpt.GetProtoRows().WriterSchema = nil wantEvolved := proto.Clone(wantBaseReqOpt).(*storagepb.AppendRowsRequest) wantEvolved.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("new")}, } want[0] = wantBaseReqFull want[1] = wantBaseReqOpt want[2] = wantEvolved want[3] = wantBaseReqOpt return want }(), }, } for _, tc := range testCases { testARC := &testAppendRowsClient{} testARC.sendF = func(req *storagepb.AppendRowsRequest) error { testARC.requests = append(testARC.requests, proto.Clone(req).(*storagepb.AppendRowsRequest)) respErr := tc.sendResults[0] tc.sendResults = tc.sendResults[1:] return respErr } for _, req := range tc.reqs { err := tc.optimizer.optimizeSend(testARC, req) if err != nil { tc.optimizer.signalReset() } } // now, compare. for k, wr := range tc.wantReqs { if diff := cmp.Diff(testARC.requests[k], wr, protocmp.Transform()); diff != "" { t.Errorf("%s (req %d) mismatch: -got, +want:\n%s", tc.description, k, diff) } } } } func TestVersionedTemplate(t *testing.T) { testCases := []struct { desc string inputTmpl *storagepb.AppendRowsRequest changes []templateRevisionF wantCompatible bool }{ { desc: "nil template", wantCompatible: true, }, { desc: "no changes", inputTmpl: &storagepb.AppendRowsRequest{}, wantCompatible: true, }, { desc: "empty schema", inputTmpl: &storagepb.AppendRowsRequest{}, changes: []templateRevisionF{ reviseProtoSchema(nil), }, wantCompatible: false, }, { desc: "same default mvi", inputTmpl: &storagepb.AppendRowsRequest{ DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE, }, changes: []templateRevisionF{ reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE), }, wantCompatible: true, }, { desc: "differing default mvi", inputTmpl: &storagepb.AppendRowsRequest{ DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE, }, changes: []templateRevisionF{ reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE), }, wantCompatible: false, }, } for _, tc := range testCases { orig := newVersionedTemplate() orig.tmpl = tc.inputTmpl orig.computeHash() rev := orig.revise(tc.changes...) if orig.Compatible(rev) != rev.Compatible(orig) { t.Errorf("case %q: inconsistent compatibility, orig %t rev %t", tc.desc, orig.Compatible(rev), rev.Compatible(orig)) } if got := orig.Compatible(rev); tc.wantCompatible != got { t.Errorf("case %q: Compatible mismatch, got %t want %t", tc.desc, got, tc.wantCompatible) } } }