1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "fmt"
20 "testing"
21 "time"
22
23 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
24 "github.com/google/go-cmp/cmp"
25 "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
26 "google.golang.org/protobuf/proto"
27 "google.golang.org/protobuf/testing/protocmp"
28 "google.golang.org/protobuf/types/descriptorpb"
29 "google.golang.org/protobuf/types/known/wrapperspb"
30 )
31
32 func TestPendingWrite(t *testing.T) {
33 ctx := context.Background()
34 wantReq := &storagepb.AppendRowsRequest{
35 Rows: &storagepb.AppendRowsRequest_ProtoRows{
36 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
37 Rows: &storagepb.ProtoRows{
38 SerializedRows: [][]byte{
39 []byte("row1"),
40 []byte("row2"),
41 []byte("row3"),
42 },
43 },
44 },
45 },
46 }
47
48
49 pending := newPendingWrite(ctx, nil, wantReq, nil, "", "")
50 if pending.req.GetOffset() != nil {
51 t.Errorf("request should have no offset, but is present: %q", pending.req.GetOffset().GetValue())
52 }
53
54 if diff := cmp.Diff(pending.req, wantReq, protocmp.Transform()); diff != "" {
55 t.Errorf("request mismatch: -got, +want:\n%s", diff)
56 }
57
58
59 select {
60 case <-pending.result.Ready():
61 t.Errorf("got Ready() on incomplete AppendResult")
62 case <-time.After(100 * time.Millisecond):
63
64 }
65
66
67 pending.markDone(&storage.AppendRowsResponse{}, nil)
68 if gotOff := pending.result.offset(ctx); gotOff != NoStreamOffset {
69 t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", gotOff, NoStreamOffset)
70 }
71 if pending.result.err != nil {
72 t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
73 }
74
75
76 pending = newPendingWrite(ctx, nil, wantReq, nil, "", "")
77
78
79
80 wantOffset := int64(101)
81 f := WithOffset(wantOffset)
82 f(pending)
83
84 if pending.req.GetOffset() == nil {
85 t.Errorf("expected offset, got none")
86 }
87 if pending.req.GetOffset().GetValue() != wantOffset {
88 t.Errorf("offset mismatch, got %d wanted %d", pending.req.GetOffset().GetValue(), wantOffset)
89 }
90
91
92 wantErr := fmt.Errorf("foo")
93
94 testResp := &storagepb.AppendRowsResponse{
95 Response: &storagepb.AppendRowsResponse_AppendResult_{
96 AppendResult: &storagepb.AppendRowsResponse_AppendResult{
97 Offset: &wrapperspb.Int64Value{
98 Value: wantOffset,
99 },
100 },
101 },
102 }
103 pending.markDone(testResp, wantErr)
104
105 if pending.req != nil {
106 t.Errorf("expected request to be cleared, is present: %#v", pending.req)
107 }
108
109 select {
110
111 case <-time.After(100 * time.Millisecond):
112 t.Errorf("possible blocking on completed AppendResult")
113 case <-pending.result.Ready():
114 gotOffset, gotErr := pending.result.GetResult(ctx)
115 if gotOffset != wantOffset {
116 t.Errorf("GetResult: mismatch on completed AppendResult offset: got %d want %d", gotOffset, wantOffset)
117 }
118 if gotErr != wantErr {
119 t.Errorf("GetResult: mismatch in errors, got %v want %v", gotErr, wantErr)
120 }
121
122 gotResp, gotErr := pending.result.FullResponse(ctx)
123 if gotErr != wantErr {
124 t.Errorf("FullResponse: mismatch in errors, got %v want %v", gotErr, wantErr)
125 }
126 if diff := cmp.Diff(gotResp, testResp, protocmp.Transform()); diff != "" {
127 t.Errorf("FullResponse diff: %s", diff)
128 }
129 }
130 }
131
132 func TestPendingWrite_ConstructFullRequest(t *testing.T) {
133
134 testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")}
135 testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP))
136
137 testEmptyTraceID := buildTraceID(&streamSettings{})
138
139 for _, tc := range []struct {
140 desc string
141 pw *pendingWrite
142 addTrace bool
143 want *storagepb.AppendRowsRequest
144 }{
145 {
146 desc: "nil request",
147 pw: &pendingWrite{
148 reqTmpl: testTmpl,
149 },
150 want: &storagepb.AppendRowsRequest{
151 Rows: &storagepb.AppendRowsRequest_ProtoRows{
152 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
153 WriterSchema: &storagepb.ProtoSchema{
154 ProtoDescriptor: testDP,
155 },
156 },
157 },
158 },
159 },
160 {
161 desc: "empty req w/trace",
162 pw: &pendingWrite{
163 req: &storagepb.AppendRowsRequest{},
164 reqTmpl: testTmpl,
165 },
166 addTrace: true,
167 want: &storagepb.AppendRowsRequest{
168 Rows: &storagepb.AppendRowsRequest_ProtoRows{
169 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
170 WriterSchema: &storagepb.ProtoSchema{
171 ProtoDescriptor: testDP,
172 },
173 },
174 },
175 TraceId: testEmptyTraceID,
176 },
177 },
178 {
179 desc: "basic req",
180 pw: &pendingWrite{
181 req: &storagepb.AppendRowsRequest{},
182 reqTmpl: testTmpl,
183 },
184 want: &storagepb.AppendRowsRequest{
185 Rows: &storagepb.AppendRowsRequest_ProtoRows{
186 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
187 WriterSchema: &storagepb.ProtoSchema{
188 ProtoDescriptor: testDP,
189 },
190 },
191 },
192 },
193 },
194 {
195 desc: "everything w/trace",
196 pw: &pendingWrite{
197 req: &storagepb.AppendRowsRequest{},
198 reqTmpl: testTmpl,
199 traceID: "foo",
200 writeStreamID: "streamid",
201 },
202 addTrace: true,
203 want: &storagepb.AppendRowsRequest{
204 WriteStream: "streamid",
205 Rows: &storagepb.AppendRowsRequest_ProtoRows{
206 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
207 WriterSchema: &storagepb.ProtoSchema{
208 ProtoDescriptor: testDP,
209 },
210 },
211 },
212 TraceId: buildTraceID(&streamSettings{TraceID: "foo"}),
213 },
214 },
215 } {
216 got := tc.pw.constructFullRequest(tc.addTrace)
217 if diff := cmp.Diff(got, tc.want, protocmp.Transform()); diff != "" {
218 t.Errorf("%s diff: %s", tc.desc, diff)
219 }
220 }
221 }
222
View as plain text