...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/appendresult_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"testing"
    21  	"time"
    22  
    23  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    24  	"github.com/google/go-cmp/cmp"
    25  	"google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
    26  	"google.golang.org/protobuf/proto"
    27  	"google.golang.org/protobuf/testing/protocmp"
    28  	"google.golang.org/protobuf/types/descriptorpb"
    29  	"google.golang.org/protobuf/types/known/wrapperspb"
    30  )
    31  
    32  func TestPendingWrite(t *testing.T) {
    33  	ctx := context.Background()
    34  	wantReq := &storagepb.AppendRowsRequest{
    35  		Rows: &storagepb.AppendRowsRequest_ProtoRows{
    36  			ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
    37  				Rows: &storagepb.ProtoRows{
    38  					SerializedRows: [][]byte{
    39  						[]byte("row1"),
    40  						[]byte("row2"),
    41  						[]byte("row3"),
    42  					},
    43  				},
    44  			},
    45  		},
    46  	}
    47  
    48  	// verify no offset behavior
    49  	pending := newPendingWrite(ctx, nil, wantReq, nil, "", "")
    50  	if pending.req.GetOffset() != nil {
    51  		t.Errorf("request should have no offset, but is present: %q", pending.req.GetOffset().GetValue())
    52  	}
    53  
    54  	if diff := cmp.Diff(pending.req, wantReq, protocmp.Transform()); diff != "" {
    55  		t.Errorf("request mismatch: -got, +want:\n%s", diff)
    56  	}
    57  
    58  	// Verify request is not acknowledged.
    59  	select {
    60  	case <-pending.result.Ready():
    61  		t.Errorf("got Ready() on incomplete AppendResult")
    62  	case <-time.After(100 * time.Millisecond):
    63  
    64  	}
    65  
    66  	// Mark completed, verify result.
    67  	pending.markDone(&storage.AppendRowsResponse{}, nil)
    68  	if gotOff := pending.result.offset(ctx); gotOff != NoStreamOffset {
    69  		t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", gotOff, NoStreamOffset)
    70  	}
    71  	if pending.result.err != nil {
    72  		t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
    73  	}
    74  
    75  	// Create new write to verify error result.
    76  	pending = newPendingWrite(ctx, nil, wantReq, nil, "", "")
    77  
    78  	// Manually invoke option to apply offset to request.
    79  	// This would normally be appied as part of the AppendRows() method on the managed stream.
    80  	wantOffset := int64(101)
    81  	f := WithOffset(wantOffset)
    82  	f(pending)
    83  
    84  	if pending.req.GetOffset() == nil {
    85  		t.Errorf("expected offset, got none")
    86  	}
    87  	if pending.req.GetOffset().GetValue() != wantOffset {
    88  		t.Errorf("offset mismatch, got %d wanted %d", pending.req.GetOffset().GetValue(), wantOffset)
    89  	}
    90  
    91  	// Verify completion behavior with an error.
    92  	wantErr := fmt.Errorf("foo")
    93  
    94  	testResp := &storagepb.AppendRowsResponse{
    95  		Response: &storagepb.AppendRowsResponse_AppendResult_{
    96  			AppendResult: &storagepb.AppendRowsResponse_AppendResult{
    97  				Offset: &wrapperspb.Int64Value{
    98  					Value: wantOffset,
    99  				},
   100  			},
   101  		},
   102  	}
   103  	pending.markDone(testResp, wantErr)
   104  
   105  	if pending.req != nil {
   106  		t.Errorf("expected request to be cleared, is present: %#v", pending.req)
   107  	}
   108  
   109  	select {
   110  
   111  	case <-time.After(100 * time.Millisecond):
   112  		t.Errorf("possible blocking on completed AppendResult")
   113  	case <-pending.result.Ready():
   114  		gotOffset, gotErr := pending.result.GetResult(ctx)
   115  		if gotOffset != wantOffset {
   116  			t.Errorf("GetResult: mismatch on completed AppendResult offset: got %d want %d", gotOffset, wantOffset)
   117  		}
   118  		if gotErr != wantErr {
   119  			t.Errorf("GetResult: mismatch in errors, got %v want %v", gotErr, wantErr)
   120  		}
   121  		// Now, check FullResponse.
   122  		gotResp, gotErr := pending.result.FullResponse(ctx)
   123  		if gotErr != wantErr {
   124  			t.Errorf("FullResponse: mismatch in errors, got %v want %v", gotErr, wantErr)
   125  		}
   126  		if diff := cmp.Diff(gotResp, testResp, protocmp.Transform()); diff != "" {
   127  			t.Errorf("FullResponse diff: %s", diff)
   128  		}
   129  	}
   130  }
   131  
   132  func TestPendingWrite_ConstructFullRequest(t *testing.T) {
   133  
   134  	testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")}
   135  	testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP))
   136  
   137  	testEmptyTraceID := buildTraceID(&streamSettings{})
   138  
   139  	for _, tc := range []struct {
   140  		desc     string
   141  		pw       *pendingWrite
   142  		addTrace bool
   143  		want     *storagepb.AppendRowsRequest
   144  	}{
   145  		{
   146  			desc: "nil request",
   147  			pw: &pendingWrite{
   148  				reqTmpl: testTmpl,
   149  			},
   150  			want: &storagepb.AppendRowsRequest{
   151  				Rows: &storagepb.AppendRowsRequest_ProtoRows{
   152  					ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   153  						WriterSchema: &storagepb.ProtoSchema{
   154  							ProtoDescriptor: testDP,
   155  						},
   156  					},
   157  				},
   158  			},
   159  		},
   160  		{
   161  			desc: "empty req w/trace",
   162  			pw: &pendingWrite{
   163  				req:     &storagepb.AppendRowsRequest{},
   164  				reqTmpl: testTmpl,
   165  			},
   166  			addTrace: true,
   167  			want: &storagepb.AppendRowsRequest{
   168  				Rows: &storagepb.AppendRowsRequest_ProtoRows{
   169  					ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   170  						WriterSchema: &storagepb.ProtoSchema{
   171  							ProtoDescriptor: testDP,
   172  						},
   173  					},
   174  				},
   175  				TraceId: testEmptyTraceID,
   176  			},
   177  		},
   178  		{
   179  			desc: "basic req",
   180  			pw: &pendingWrite{
   181  				req:     &storagepb.AppendRowsRequest{},
   182  				reqTmpl: testTmpl,
   183  			},
   184  			want: &storagepb.AppendRowsRequest{
   185  				Rows: &storagepb.AppendRowsRequest_ProtoRows{
   186  					ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   187  						WriterSchema: &storagepb.ProtoSchema{
   188  							ProtoDescriptor: testDP,
   189  						},
   190  					},
   191  				},
   192  			},
   193  		},
   194  		{
   195  			desc: "everything w/trace",
   196  			pw: &pendingWrite{
   197  				req:           &storagepb.AppendRowsRequest{},
   198  				reqTmpl:       testTmpl,
   199  				traceID:       "foo",
   200  				writeStreamID: "streamid",
   201  			},
   202  			addTrace: true,
   203  			want: &storagepb.AppendRowsRequest{
   204  				WriteStream: "streamid",
   205  				Rows: &storagepb.AppendRowsRequest_ProtoRows{
   206  					ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   207  						WriterSchema: &storagepb.ProtoSchema{
   208  							ProtoDescriptor: testDP,
   209  						},
   210  					},
   211  				},
   212  				TraceId: buildTraceID(&streamSettings{TraceID: "foo"}),
   213  			},
   214  		},
   215  	} {
   216  		got := tc.pw.constructFullRequest(tc.addTrace)
   217  		if diff := cmp.Diff(got, tc.want, protocmp.Transform()); diff != "" {
   218  			t.Errorf("%s diff: %s", tc.desc, diff)
   219  		}
   220  	}
   221  }
   222  

View as plain text