1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "io"
20 "testing"
21
22 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
23 "cloud.google.com/go/bigquery/storage/managedwriter/testdata"
24 "github.com/google/go-cmp/cmp"
25 "google.golang.org/protobuf/proto"
26 "google.golang.org/protobuf/reflect/protodesc"
27 "google.golang.org/protobuf/testing/protocmp"
28 "google.golang.org/protobuf/types/descriptorpb"
29 )
30
31 func TestSendOptimizer(t *testing.T) {
32
33 exampleReq := &storagepb.AppendRowsRequest{
34 Rows: &storagepb.AppendRowsRequest_ProtoRows{
35 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
36 Rows: &storagepb.ProtoRows{
37 SerializedRows: [][]byte{[]byte("row_data")},
38 },
39 },
40 },
41 }
42 exampleStreamID := "foo"
43 exampleTraceID := "trace_id"
44 exampleReqFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
45 exampleReqFull.WriteStream = exampleStreamID
46 exampleReqFull.TraceId = buildTraceID(&streamSettings{TraceID: exampleTraceID})
47 exampleDP := &descriptorpb.DescriptorProto{Name: proto.String("schema")}
48 exampleReqFull.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
49 ProtoDescriptor: proto.Clone(exampleDP).(*descriptorpb.DescriptorProto),
50 }
51
52 ctx := context.Background()
53
54 var testCases = []struct {
55 description string
56 optimizer sendOptimizer
57 reqs []*pendingWrite
58 sendResults []error
59 wantReqs []*storagepb.AppendRowsRequest
60 }{
61 {
62 description: "verbose-optimizer",
63 optimizer: &verboseOptimizer{},
64 reqs: func() []*pendingWrite {
65 tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
66 return []*pendingWrite{
67 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
68 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
69 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
70 }
71 }(),
72 sendResults: []error{
73 nil,
74 io.EOF,
75 io.EOF,
76 },
77 wantReqs: []*storagepb.AppendRowsRequest{
78 proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
79 proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
80 proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
81 },
82 },
83 {
84 description: "simplex no errors",
85 optimizer: &simplexOptimizer{},
86 reqs: func() []*pendingWrite {
87 tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
88 return []*pendingWrite{
89 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
90 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
91 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
92 }
93 }(),
94 sendResults: []error{
95 nil,
96 nil,
97 nil,
98 },
99 wantReqs: func() []*storagepb.AppendRowsRequest {
100 want := make([]*storagepb.AppendRowsRequest, 3)
101
102 want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
103 req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
104 req.GetProtoRows().WriterSchema = nil
105 req.TraceId = ""
106 req.WriteStream = ""
107
108 want[1] = req
109 want[2] = req
110 return want
111 }(),
112 },
113 {
114 description: "simplex w/partial errors",
115 optimizer: &simplexOptimizer{},
116 reqs: func() []*pendingWrite {
117 tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
118 return []*pendingWrite{
119 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
120 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
121 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
122 }
123 }(),
124 sendResults: []error{
125 nil,
126 io.EOF,
127 nil,
128 },
129 wantReqs: func() []*storagepb.AppendRowsRequest {
130 want := make([]*storagepb.AppendRowsRequest, 3)
131 want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
132 req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
133 req.GetProtoRows().WriterSchema = nil
134 req.TraceId = ""
135 req.WriteStream = ""
136
137 want[1] = req
138
139 want[2] = want[0]
140 return want
141 }(),
142 },
143 {
144 description: "multiplex single all errors",
145 optimizer: &multiplexOptimizer{},
146 reqs: func() []*pendingWrite {
147 tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
148 return []*pendingWrite{
149 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
150 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
151 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
152 }
153 }(),
154 sendResults: []error{
155 io.EOF,
156 io.EOF,
157 io.EOF,
158 },
159 wantReqs: []*storagepb.AppendRowsRequest{
160 proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
161 proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
162 proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
163 },
164 },
165 {
166 description: "multiplex single no errors",
167 optimizer: &multiplexOptimizer{},
168 reqs: func() []*pendingWrite {
169 tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
170 return []*pendingWrite{
171 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
172 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
173 newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
174 }
175 }(),
176 sendResults: []error{
177 nil,
178 nil,
179 nil,
180 },
181 wantReqs: func() []*storagepb.AppendRowsRequest {
182 want := make([]*storagepb.AppendRowsRequest, 3)
183 want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
184 req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
185 req.GetProtoRows().WriterSchema = nil
186 req.TraceId = ""
187 want[1] = req
188 want[2] = req
189 return want
190 }(),
191 },
192 {
193 description: "multiplex interleave",
194 optimizer: &multiplexOptimizer{},
195 reqs: func() []*pendingWrite {
196 tmplA := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
197 tmplB := newVersionedTemplate().revise(reviseProtoSchema(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())))
198
199 reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
200 reqA.WriteStream = "alpha"
201
202 reqB := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
203 reqB.WriteStream = "beta"
204
205 writes := make([]*pendingWrite, 10)
206 writes[0] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
207 writes[1] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
208 writes[2] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
209 writes[3] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
210 writes[4] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
211 writes[5] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
212 writes[6] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
213 writes[7] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
214 writes[8] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
215 writes[9] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
216
217 return writes
218 }(),
219 sendResults: []error{
220 nil,
221 nil,
222 nil,
223 nil,
224 nil,
225 io.EOF,
226 nil,
227 nil,
228 nil,
229 io.EOF,
230 },
231 wantReqs: func() []*storagepb.AppendRowsRequest {
232 want := make([]*storagepb.AppendRowsRequest, 10)
233
234 wantReqAFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
235 wantReqAFull.WriteStream = "alpha"
236
237 wantReqANoTrace := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
238 wantReqANoTrace.TraceId = ""
239
240 wantReqAOpt := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
241 wantReqAOpt.GetProtoRows().WriterSchema = nil
242 wantReqAOpt.TraceId = ""
243
244 wantReqBFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
245 wantReqBFull.WriteStream = "beta"
246 wantReqBFull.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())
247
248 wantReqBNoTrace := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
249 wantReqBNoTrace.TraceId = ""
250
251 wantReqBOpt := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
252 wantReqBOpt.GetProtoRows().WriterSchema = nil
253 wantReqBOpt.TraceId = ""
254
255 want[0] = wantReqAFull
256 want[1] = wantReqAOpt
257 want[2] = wantReqBNoTrace
258 want[3] = wantReqANoTrace
259 want[4] = wantReqBNoTrace
260 want[5] = wantReqBOpt
261 want[6] = wantReqBFull
262 want[7] = wantReqBOpt
263 want[8] = wantReqANoTrace
264 want[9] = wantReqAOpt
265
266 return want
267 }(),
268 },
269 {
270 description: "multiplex w/evolution",
271 optimizer: &multiplexOptimizer{},
272 reqs: func() []*pendingWrite {
273 tmplOld := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
274 tmplNew := tmplOld.revise(reviseProtoSchema(&descriptorpb.DescriptorProto{Name: proto.String("new")}))
275
276 example := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
277
278 writes := make([]*pendingWrite, 4)
279 writes[0] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID)
280 writes[1] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID)
281 writes[2] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID)
282 writes[3] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID)
283
284 return writes
285 }(),
286 sendResults: []error{
287 nil,
288 nil,
289 nil,
290 nil,
291 },
292 wantReqs: func() []*storagepb.AppendRowsRequest {
293 want := make([]*storagepb.AppendRowsRequest, 4)
294
295 wantBaseReqFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
296
297 wantBaseReqOpt := proto.Clone(wantBaseReqFull).(*storagepb.AppendRowsRequest)
298 wantBaseReqOpt.TraceId = ""
299 wantBaseReqOpt.GetProtoRows().WriterSchema = nil
300
301 wantEvolved := proto.Clone(wantBaseReqOpt).(*storagepb.AppendRowsRequest)
302 wantEvolved.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
303 ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("new")},
304 }
305
306 want[0] = wantBaseReqFull
307 want[1] = wantBaseReqOpt
308 want[2] = wantEvolved
309 want[3] = wantBaseReqOpt
310 return want
311 }(),
312 },
313 }
314
315 for _, tc := range testCases {
316 testARC := &testAppendRowsClient{}
317 testARC.sendF = func(req *storagepb.AppendRowsRequest) error {
318 testARC.requests = append(testARC.requests, proto.Clone(req).(*storagepb.AppendRowsRequest))
319 respErr := tc.sendResults[0]
320 tc.sendResults = tc.sendResults[1:]
321 return respErr
322 }
323
324 for _, req := range tc.reqs {
325 err := tc.optimizer.optimizeSend(testARC, req)
326 if err != nil {
327 tc.optimizer.signalReset()
328 }
329 }
330
331 for k, wr := range tc.wantReqs {
332 if diff := cmp.Diff(testARC.requests[k], wr, protocmp.Transform()); diff != "" {
333 t.Errorf("%s (req %d) mismatch: -got, +want:\n%s", tc.description, k, diff)
334 }
335 }
336 }
337 }
338
339 func TestVersionedTemplate(t *testing.T) {
340 testCases := []struct {
341 desc string
342 inputTmpl *storagepb.AppendRowsRequest
343 changes []templateRevisionF
344 wantCompatible bool
345 }{
346 {
347 desc: "nil template",
348 wantCompatible: true,
349 },
350 {
351 desc: "no changes",
352 inputTmpl: &storagepb.AppendRowsRequest{},
353 wantCompatible: true,
354 },
355 {
356 desc: "empty schema",
357 inputTmpl: &storagepb.AppendRowsRequest{},
358 changes: []templateRevisionF{
359 reviseProtoSchema(nil),
360 },
361 wantCompatible: false,
362 },
363 {
364 desc: "same default mvi",
365 inputTmpl: &storagepb.AppendRowsRequest{
366 DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE,
367 },
368 changes: []templateRevisionF{
369 reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE),
370 },
371 wantCompatible: true,
372 },
373 {
374 desc: "differing default mvi",
375 inputTmpl: &storagepb.AppendRowsRequest{
376 DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE,
377 },
378 changes: []templateRevisionF{
379 reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE),
380 },
381 wantCompatible: false,
382 },
383 }
384
385 for _, tc := range testCases {
386 orig := newVersionedTemplate()
387 orig.tmpl = tc.inputTmpl
388 orig.computeHash()
389
390 rev := orig.revise(tc.changes...)
391 if orig.Compatible(rev) != rev.Compatible(orig) {
392 t.Errorf("case %q: inconsistent compatibility, orig %t rev %t", tc.desc, orig.Compatible(rev), rev.Compatible(orig))
393 }
394 if got := orig.Compatible(rev); tc.wantCompatible != got {
395 t.Errorf("case %q: Compatible mismatch, got %t want %t", tc.desc, got, tc.wantCompatible)
396 }
397 }
398 }
399
View as plain text