// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "context" "fmt" "testing" "time" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "github.com/google/go-cmp/cmp" "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" ) func TestPendingWrite(t *testing.T) { ctx := context.Background() wantReq := &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ Rows: &storagepb.ProtoRows{ SerializedRows: [][]byte{ []byte("row1"), []byte("row2"), []byte("row3"), }, }, }, }, } // verify no offset behavior pending := newPendingWrite(ctx, nil, wantReq, nil, "", "") if pending.req.GetOffset() != nil { t.Errorf("request should have no offset, but is present: %q", pending.req.GetOffset().GetValue()) } if diff := cmp.Diff(pending.req, wantReq, protocmp.Transform()); diff != "" { t.Errorf("request mismatch: -got, +want:\n%s", diff) } // Verify request is not acknowledged. select { case <-pending.result.Ready(): t.Errorf("got Ready() on incomplete AppendResult") case <-time.After(100 * time.Millisecond): } // Mark completed, verify result. pending.markDone(&storage.AppendRowsResponse{}, nil) if gotOff := pending.result.offset(ctx); gotOff != NoStreamOffset { t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", gotOff, NoStreamOffset) } if pending.result.err != nil { t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err) } // Create new write to verify error result. pending = newPendingWrite(ctx, nil, wantReq, nil, "", "") // Manually invoke option to apply offset to request. // This would normally be appied as part of the AppendRows() method on the managed stream. wantOffset := int64(101) f := WithOffset(wantOffset) f(pending) if pending.req.GetOffset() == nil { t.Errorf("expected offset, got none") } if pending.req.GetOffset().GetValue() != wantOffset { t.Errorf("offset mismatch, got %d wanted %d", pending.req.GetOffset().GetValue(), wantOffset) } // Verify completion behavior with an error. wantErr := fmt.Errorf("foo") testResp := &storagepb.AppendRowsResponse{ Response: &storagepb.AppendRowsResponse_AppendResult_{ AppendResult: &storagepb.AppendRowsResponse_AppendResult{ Offset: &wrapperspb.Int64Value{ Value: wantOffset, }, }, }, } pending.markDone(testResp, wantErr) if pending.req != nil { t.Errorf("expected request to be cleared, is present: %#v", pending.req) } select { case <-time.After(100 * time.Millisecond): t.Errorf("possible blocking on completed AppendResult") case <-pending.result.Ready(): gotOffset, gotErr := pending.result.GetResult(ctx) if gotOffset != wantOffset { t.Errorf("GetResult: mismatch on completed AppendResult offset: got %d want %d", gotOffset, wantOffset) } if gotErr != wantErr { t.Errorf("GetResult: mismatch in errors, got %v want %v", gotErr, wantErr) } // Now, check FullResponse. gotResp, gotErr := pending.result.FullResponse(ctx) if gotErr != wantErr { t.Errorf("FullResponse: mismatch in errors, got %v want %v", gotErr, wantErr) } if diff := cmp.Diff(gotResp, testResp, protocmp.Transform()); diff != "" { t.Errorf("FullResponse diff: %s", diff) } } } func TestPendingWrite_ConstructFullRequest(t *testing.T) { testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")} testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP)) testEmptyTraceID := buildTraceID(&streamSettings{}) for _, tc := range []struct { desc string pw *pendingWrite addTrace bool want *storagepb.AppendRowsRequest }{ { desc: "nil request", pw: &pendingWrite{ reqTmpl: testTmpl, }, want: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ WriterSchema: &storagepb.ProtoSchema{ ProtoDescriptor: testDP, }, }, }, }, }, { desc: "empty req w/trace", pw: &pendingWrite{ req: &storagepb.AppendRowsRequest{}, reqTmpl: testTmpl, }, addTrace: true, want: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ WriterSchema: &storagepb.ProtoSchema{ ProtoDescriptor: testDP, }, }, }, TraceId: testEmptyTraceID, }, }, { desc: "basic req", pw: &pendingWrite{ req: &storagepb.AppendRowsRequest{}, reqTmpl: testTmpl, }, want: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ WriterSchema: &storagepb.ProtoSchema{ ProtoDescriptor: testDP, }, }, }, }, }, { desc: "everything w/trace", pw: &pendingWrite{ req: &storagepb.AppendRowsRequest{}, reqTmpl: testTmpl, traceID: "foo", writeStreamID: "streamid", }, addTrace: true, want: &storagepb.AppendRowsRequest{ WriteStream: "streamid", Rows: &storagepb.AppendRowsRequest_ProtoRows{ ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ WriterSchema: &storagepb.ProtoSchema{ ProtoDescriptor: testDP, }, }, }, TraceId: buildTraceID(&streamSettings{TraceID: "foo"}), }, }, } { got := tc.pw.constructFullRequest(tc.addTrace) if diff := cmp.Diff(got, tc.want, protocmp.Transform()); diff != "" { t.Errorf("%s diff: %s", tc.desc, diff) } } }