1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "fmt"
20 "math"
21 "sync"
22 "testing"
23 "time"
24
25 "cloud.google.com/go/bigquery"
26 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
27 "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
28 "cloud.google.com/go/bigquery/storage/managedwriter/testdata"
29 "cloud.google.com/go/internal/testutil"
30 "cloud.google.com/go/internal/uid"
31 "github.com/google/go-cmp/cmp"
32 "github.com/googleapis/gax-go/v2/apierror"
33 "go.opencensus.io/stats/view"
34 "google.golang.org/api/option"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/protobuf/encoding/protojson"
37 "google.golang.org/protobuf/proto"
38 "google.golang.org/protobuf/reflect/protodesc"
39 "google.golang.org/protobuf/reflect/protoreflect"
40 "google.golang.org/protobuf/testing/protocmp"
41 "google.golang.org/protobuf/types/descriptorpb"
42 "google.golang.org/protobuf/types/dynamicpb"
43 "google.golang.org/protobuf/types/known/wrapperspb"
44 )
45
46 var (
47 datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
48 tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
49 defaultTestTimeout = 90 * time.Second
50 )
51
52
53 var testSimpleData = []*testdata.SimpleMessageProto2{
54 {Name: proto.String("one"), Value: proto.Int64(1)},
55 {Name: proto.String("two"), Value: proto.Int64(2)},
56 {Name: proto.String("three"), Value: proto.Int64(3)},
57 {Name: proto.String("four"), Value: proto.Int64(1)},
58 {Name: proto.String("five"), Value: proto.Int64(2)},
59 }
60
61 func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
62 if testing.Short() {
63 t.Skip("Integration tests skipped in short mode")
64 }
65 projID := testutil.ProjID()
66 if projID == "" {
67 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
68 }
69 ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery")
70 if ts == nil {
71 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
72 }
73 opts = append(opts, option.WithTokenSource(ts))
74 client, err := NewClient(ctx, projID, opts...)
75 if err != nil {
76 t.Fatalf("couldn't create managedwriter client: %v", err)
77 }
78
79 bqClient, err := bigquery.NewClient(ctx, projID, opts...)
80 if err != nil {
81 t.Fatalf("couldn't create bigquery client: %v", err)
82 }
83 return client, bqClient
84 }
85
86
87 func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) {
88 dataset := bqc.Dataset(datasetIDs.New())
89 if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil {
90 return nil, nil, err
91 }
92 return dataset, func() {
93 if err := dataset.DeleteWithContents(ctx); err != nil {
94 t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err)
95 }
96 }, nil
97 }
98
99
100 func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
101 convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
102 if err != nil {
103 t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
104 }
105
106 descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
107 if err != nil {
108 t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
109 }
110 messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
111 if !ok {
112 t.Fatalf("adapted descriptor is not a message descriptor")
113 }
114 dp, err := adapt.NormalizeDescriptor(messageDescriptor)
115 if err != nil {
116 t.Fatalf("NormalizeDescriptor: %v", err)
117 }
118 return messageDescriptor, dp
119 }
120
121 func TestIntegration_ClientGetWriteStream(t *testing.T) {
122 ctx := context.Background()
123 mwClient, bqClient := getTestClients(ctx, t)
124 defer mwClient.Close()
125 defer bqClient.Close()
126
127 wantLocation := "us-east1"
128 dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation)
129 if err != nil {
130 t.Fatalf("failed to init test dataset: %v", err)
131 }
132 defer cleanup()
133
134 testTable := dataset.Table(tableIDs.New())
135 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
136 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
137 }
138
139 apiSchema, _ := adapt.BQSchemaToStorageTableSchema(testdata.SimpleMessageSchema)
140 parent := TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)
141 explicitStream, err := mwClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
142 Parent: parent,
143 WriteStream: &storagepb.WriteStream{
144 Type: storagepb.WriteStream_PENDING,
145 },
146 })
147 if err != nil {
148 t.Fatalf("CreateWriteStream: %v", err)
149 }
150
151 testCases := []struct {
152 description string
153 isDefault bool
154 streamID string
155 wantType storagepb.WriteStream_Type
156 }{
157 {
158 description: "default",
159 isDefault: true,
160 streamID: fmt.Sprintf("%s/streams/_default", parent),
161 wantType: storagepb.WriteStream_COMMITTED,
162 },
163 {
164 description: "explicit pending",
165 streamID: explicitStream.Name,
166 wantType: storagepb.WriteStream_PENDING,
167 },
168 }
169
170 for _, tc := range testCases {
171 for _, fullView := range []bool{false, true} {
172 info, err := mwClient.getWriteStream(ctx, tc.streamID, fullView)
173 if err != nil {
174 t.Errorf("%s (%T): getWriteStream failed: %v", tc.description, fullView, err)
175 }
176 if info.GetType() != tc.wantType {
177 t.Errorf("%s (%T): got type %d, want type %d", tc.description, fullView, info.GetType(), tc.wantType)
178 }
179 if info.GetLocation() != wantLocation {
180 t.Errorf("%s (%T) view: got location %s, want location %s", tc.description, fullView, info.GetLocation(), wantLocation)
181 }
182 if info.GetCommitTime() != nil {
183 t.Errorf("%s (%T)expected empty commit time, got %v", tc.description, fullView, info.GetCommitTime())
184 }
185
186 if !tc.isDefault {
187 if info.GetCreateTime() == nil {
188 t.Errorf("%s (%T): expected create time, was empty", tc.description, fullView)
189 }
190 } else {
191 if info.GetCreateTime() != nil {
192 t.Errorf("%s (%T): expected empty time, got %v", tc.description, fullView, info.GetCreateTime())
193 }
194 }
195
196 if !fullView {
197 if info.GetTableSchema() != nil {
198 t.Errorf("%s (%T) basic view: expected no schema, was populated", tc.description, fullView)
199 }
200 } else {
201 if diff := cmp.Diff(info.GetTableSchema(), apiSchema, protocmp.Transform()); diff != "" {
202 t.Errorf("%s (%T) schema mismatch: -got, +want:\n%s", tc.description, fullView, diff)
203 }
204 }
205 }
206 }
207 }
208
209 func TestIntegration_ManagedWriter(t *testing.T) {
210 mwClient, bqClient := getTestClients(context.Background(), t)
211 defer mwClient.Close()
212 defer bqClient.Close()
213
214 dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
215 if err != nil {
216 t.Fatalf("failed to init test dataset: %v", err)
217 }
218 defer cleanup()
219
220 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
221 defer cancel()
222
223 t.Run("group", func(t *testing.T) {
224 t.Run("DefaultStream", func(t *testing.T) {
225 t.Parallel()
226 testDefaultStream(ctx, t, mwClient, bqClient, dataset)
227 })
228 t.Run("DefaultStreamDynamicJSON", func(t *testing.T) {
229 t.Parallel()
230 testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
231 })
232 t.Run("CommittedStream", func(t *testing.T) {
233 t.Parallel()
234 testCommittedStream(ctx, t, mwClient, bqClient, dataset)
235 })
236 t.Run("ErrorBehaviors", func(t *testing.T) {
237 t.Parallel()
238 testErrorBehaviors(ctx, t, mwClient, bqClient, dataset)
239 })
240 t.Run("BufferedStream", func(t *testing.T) {
241 t.Parallel()
242 testBufferedStream(ctx, t, mwClient, bqClient, dataset)
243 })
244 t.Run("PendingStream", func(t *testing.T) {
245 t.Parallel()
246 testPendingStream(ctx, t, mwClient, bqClient, dataset)
247 })
248 t.Run("SimpleCDC", func(t *testing.T) {
249 t.Parallel()
250 testSimpleCDC(ctx, t, mwClient, bqClient, dataset)
251 })
252 t.Run("Instrumentation", func(t *testing.T) {
253
254 testInstrumentation(ctx, t, mwClient, bqClient, dataset)
255 })
256 t.Run("TestLargeInsertNoRetry", func(t *testing.T) {
257 testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset)
258 })
259 t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
260 testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
261 })
262 t.Run("DefaultValueHandling", func(t *testing.T) {
263 testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset)
264 })
265 })
266 }
267
268 func TestIntegration_SchemaEvolution(t *testing.T) {
269
270 testcases := []struct {
271 desc string
272 clientOpts []option.ClientOption
273 writerOpts []WriterOption
274 }{
275 {
276 desc: "Simplex_Committed",
277 writerOpts: []WriterOption{
278 WithType(CommittedStream),
279 },
280 },
281 {
282 desc: "Simplex_Default",
283 writerOpts: []WriterOption{
284 WithType(DefaultStream),
285 },
286 },
287 {
288 desc: "Multiplex_Default",
289 clientOpts: []option.ClientOption{
290 WithMultiplexing(),
291 WithMultiplexPoolLimit(2),
292 },
293 writerOpts: []WriterOption{
294 WithType(DefaultStream),
295 },
296 },
297 }
298
299 for _, tc := range testcases {
300 mwClient, bqClient := getTestClients(context.Background(), t, tc.clientOpts...)
301 defer mwClient.Close()
302 defer bqClient.Close()
303
304 dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
305 if err != nil {
306 t.Fatalf("failed to init test dataset: %v", err)
307 }
308 defer cleanup()
309
310 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
311 defer cancel()
312 t.Run(tc.desc, func(t *testing.T) {
313 testSchemaEvolution(ctx, t, mwClient, bqClient, dataset, tc.writerOpts...)
314 })
315 }
316 }
317
318 func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
319 testTable := dataset.Table(tableIDs.New())
320 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
321 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
322 }
323
324
325 m := &testdata.SimpleMessageProto2{}
326 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
327
328
329 ms, err := mwClient.NewManagedStream(ctx,
330 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
331 WithType(DefaultStream),
332 WithSchemaDescriptor(descriptorProto),
333 )
334 if err != nil {
335 t.Fatalf("NewManagedStream: %v", err)
336 }
337 if ms.id == "" {
338 t.Errorf("managed stream is missing ID")
339 }
340 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
341 withExactRowCount(0))
342
343
344 var result *AppendResult
345 for k, mesg := range testSimpleData {
346 b, err := proto.Marshal(mesg)
347 if err != nil {
348 t.Errorf("failed to marshal message %d: %v", k, err)
349 }
350 data := [][]byte{b}
351 result, err = ms.AppendRows(ctx, data)
352 if err != nil {
353 t.Errorf("single-row append %d failed: %v", k, err)
354 }
355 }
356
357 o, err := result.GetResult(ctx)
358 if err != nil {
359 t.Errorf("result error for last send: %v", err)
360 }
361 if o != NoStreamOffset {
362 t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
363 }
364 validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
365 withExactRowCount(int64(len(testSimpleData))),
366 withDistinctValues("name", int64(len(testSimpleData))))
367
368
369 var data [][]byte
370 for k, mesg := range testSimpleData {
371 b, err := proto.Marshal(mesg)
372 if err != nil {
373 t.Errorf("failed to marshal message %d: %v", k, err)
374 }
375 data = append(data, b)
376 }
377 result, err = ms.AppendRows(ctx, data)
378 if err != nil {
379 t.Errorf("grouped-row append failed: %v", err)
380 }
381
382
383 o, err = result.GetResult(ctx)
384 if err != nil {
385 t.Errorf("result error for last send: %v", err)
386 }
387 if o != NoStreamOffset {
388 t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
389 }
390 validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
391 withExactRowCount(int64(2*len(testSimpleData))),
392 withDistinctValues("name", int64(len(testSimpleData))),
393 withDistinctValues("value", int64(3)),
394 )
395 }
396
397 func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
398 testTable := dataset.Table(tableIDs.New())
399 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.GithubArchiveSchema}); err != nil {
400 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
401 }
402
403 md, descriptorProto := setupDynamicDescriptors(t, testdata.GithubArchiveSchema)
404
405 ms, err := mwClient.NewManagedStream(ctx,
406 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
407 WithType(DefaultStream),
408 WithSchemaDescriptor(descriptorProto),
409 )
410 if err != nil {
411 t.Fatalf("NewManagedStream: %v", err)
412 }
413 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
414 withExactRowCount(0))
415
416 sampleJSONData := [][]byte{
417 []byte(`{"type": "foo", "public": true, "repo": {"id": 99, "name": "repo_name_1", "url": "https://one.example.com"}}`),
418 []byte(`{"type": "bar", "public": false, "repo": {"id": 101, "name": "repo_name_2", "url": "https://two.example.com"}}`),
419 []byte(`{"type": "baz", "public": true, "repo": {"id": 456, "name": "repo_name_3", "url": "https://three.example.com"}}`),
420 []byte(`{"type": "wow", "public": false, "repo": {"id": 123, "name": "repo_name_4", "url": "https://four.example.com"}}`),
421 []byte(`{"type": "yay", "public": true, "repo": {"name": "repo_name_5", "url": "https://five.example.com"}}`),
422 }
423
424 var result *AppendResult
425 for k, v := range sampleJSONData {
426 message := dynamicpb.NewMessage(md)
427
428
429 err = protojson.Unmarshal(v, message)
430 if err != nil {
431 t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
432 }
433
434 b, err := proto.Marshal(message)
435 if err != nil {
436 t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
437 }
438 result, err = ms.AppendRows(ctx, [][]byte{b})
439 if err != nil {
440 t.Errorf("single-row append %d failed: %v", k, err)
441 }
442 }
443
444
445 o, err := result.GetResult(ctx)
446 if err != nil {
447 t.Errorf("result error for last send: %v", err)
448 }
449 if o != NoStreamOffset {
450 t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
451 }
452 validateTableConstraints(ctx, t, bqClient, testTable, "after send",
453 withExactRowCount(int64(len(sampleJSONData))),
454 withDistinctValues("type", int64(len(sampleJSONData))),
455 withDistinctValues("public", int64(2)))
456 }
457
458 func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
459 testTable := dataset.Table(tableIDs.New())
460 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
461 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
462 }
463
464 m := &testdata.SimpleMessageProto2{}
465 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
466
467 ms, err := mwClient.NewManagedStream(ctx,
468 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
469 WithType(BufferedStream),
470 WithSchemaDescriptor(descriptorProto),
471 )
472 if err != nil {
473 t.Fatalf("NewManagedStream: %v", err)
474 }
475
476 info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID, false)
477 if err != nil {
478 t.Errorf("couldn't get stream info: %v", err)
479 }
480 if info.GetType().String() != string(ms.StreamType()) {
481 t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
482 }
483 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
484 withExactRowCount(0))
485
486 var expectedRows int64
487 for k, mesg := range testSimpleData {
488 b, err := proto.Marshal(mesg)
489 if err != nil {
490 t.Fatalf("failed to marshal message %d: %v", k, err)
491 }
492 data := [][]byte{b}
493 results, err := ms.AppendRows(ctx, data)
494 if err != nil {
495 t.Fatalf("single-row append %d failed: %v", k, err)
496 }
497
498 offset, err := results.GetResult(ctx)
499 if err != nil {
500 t.Fatalf("got error from pending result %d: %v", k, err)
501 }
502 validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k),
503 withExactRowCount(expectedRows),
504 withDistinctValues("name", expectedRows))
505
506
507 flushOffset, err := ms.FlushRows(ctx, offset)
508 if err != nil {
509 t.Errorf("failed to flush offset to %d: %v", offset, err)
510 }
511 expectedRows = flushOffset + 1
512 validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k),
513 withExactRowCount(expectedRows),
514 withDistinctValues("name", expectedRows))
515 }
516 }
517
518 func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
519 testTable := dataset.Table(tableIDs.New())
520 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
521 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
522 }
523
524 m := &testdata.SimpleMessageProto2{}
525 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
526
527
528 ms, err := mwClient.NewManagedStream(ctx,
529 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
530 WithType(CommittedStream),
531 WithSchemaDescriptor(descriptorProto),
532 )
533 if err != nil {
534 t.Fatalf("NewManagedStream: %v", err)
535 }
536 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
537 withExactRowCount(0))
538
539 var result *AppendResult
540 for k, mesg := range testSimpleData {
541 b, err := proto.Marshal(mesg)
542 if err != nil {
543 t.Errorf("failed to marshal message %d: %v", k, err)
544 }
545 data := [][]byte{b}
546 result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
547 if err != nil {
548 t.Errorf("single-row append %d failed: %v", k, err)
549 }
550 }
551
552 o, err := result.GetResult(ctx)
553 if err != nil {
554 t.Errorf("result error for last send: %v", err)
555 }
556 wantOffset := int64(len(testSimpleData) - 1)
557 if o != wantOffset {
558 t.Errorf("offset mismatch, got %d want %d", o, wantOffset)
559 }
560 validateTableConstraints(ctx, t, bqClient, testTable, "after send",
561 withExactRowCount(int64(len(testSimpleData))))
562 }
563
564
565
566 func testSimpleCDC(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
567 testTable := dataset.Table(tableIDs.New())
568
569 if err := testTable.Create(ctx, &bigquery.TableMetadata{
570 Schema: testdata.ExampleEmployeeSchema,
571 Clustering: &bigquery.Clustering{
572 Fields: []string{"id"},
573 },
574 }); err != nil {
575 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
576 }
577
578
579 tableIdentifier, _ := testTable.Identifier(bigquery.StandardSQLID)
580 sql := fmt.Sprintf("ALTER TABLE %s ADD PRIMARY KEY(id) NOT ENFORCED;", tableIdentifier)
581 if _, err := bqClient.Query(sql).Read(ctx); err != nil {
582 t.Fatalf("failed ALTER TABLE: %v", err)
583 }
584
585 m := &testdata.ExampleEmployeeCDC{}
586 descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
587 if err != nil {
588 t.Fatalf("NormalizeDescriptor: %v", err)
589 }
590
591
592 writer, err := mwClient.NewManagedStream(ctx,
593 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
594 WithType(CommittedStream),
595 WithSchemaDescriptor(descriptorProto),
596 )
597 if err != nil {
598 t.Fatalf("NewManagedStream: %v", err)
599 }
600 defer writer.Close()
601 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
602 withExactRowCount(0))
603
604 initialEmployees := []*testdata.ExampleEmployeeCDC{
605 {
606 Id: proto.Int64(1),
607 Username: proto.String("alice"),
608 GivenName: proto.String("Alice CEO"),
609 Departments: []string{"product", "support", "internal"},
610 Salary: proto.Int64(1),
611 XCHANGE_TYPE: proto.String("INSERT"),
612 },
613 {
614 Id: proto.Int64(2),
615 Username: proto.String("bob"),
616 GivenName: proto.String("Bob Bobberson"),
617 Departments: []string{"research"},
618 Salary: proto.Int64(100000),
619 XCHANGE_TYPE: proto.String("INSERT"),
620 },
621 {
622 Id: proto.Int64(3),
623 Username: proto.String("clarice"),
624 GivenName: proto.String("Clarice Clearwater"),
625 Departments: []string{"product"},
626 Salary: proto.Int64(100001),
627 XCHANGE_TYPE: proto.String("INSERT"),
628 },
629 }
630
631
632 data := make([][]byte, len(initialEmployees))
633 for k, mesg := range initialEmployees {
634 b, err := proto.Marshal(mesg)
635 if err != nil {
636 t.Fatalf("failed to marshal record %d: %v", k, err)
637 }
638 data[k] = b
639 }
640 result, err := writer.AppendRows(ctx, data)
641 if err != nil {
642 t.Errorf("initial insert failed (%s): %v", writer.StreamName(), err)
643 }
644 if _, err := result.GetResult(ctx); err != nil {
645 t.Errorf("result error for initial insert (%s): %v", writer.StreamName(), err)
646 }
647 validateTableConstraints(ctx, t, bqClient, testTable, "initial inserts",
648 withExactRowCount(int64(len(initialEmployees))))
649
650
651 updateWriter, err := mwClient.NewManagedStream(ctx,
652 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
653 WithType(DefaultStream),
654 WithSchemaDescriptor(descriptorProto),
655 )
656 if err != nil {
657 t.Fatalf("NewManagedStream: %v", err)
658 }
659 defer updateWriter.Close()
660
661
662 newBob := proto.Clone(initialEmployees[1]).(*testdata.ExampleEmployeeCDC)
663 newBob.Salary = proto.Int64(105000)
664 newBob.Departments = []string{"research", "product"}
665 newBob.XCHANGE_TYPE = proto.String("UPSERT")
666 b, err := proto.Marshal(newBob)
667 if err != nil {
668 t.Fatalf("failed to marshal new bob: %v", err)
669 }
670 result, err = updateWriter.AppendRows(ctx, [][]byte{b})
671 if err != nil {
672 t.Fatalf("bob modification failed (%s): %v", updateWriter.StreamName(), err)
673 }
674 if _, err := result.GetResult(ctx); err != nil {
675 t.Fatalf("result error for bob modification (%s): %v", updateWriter.StreamName(), err)
676 }
677 validateTableConstraints(ctx, t, bqClient, testTable, "after bob modification",
678 withExactRowCount(int64(len(initialEmployees))),
679 withDistinctValues("id", int64(len(initialEmployees))))
680
681
682 removeClarice := &testdata.ExampleEmployeeCDC{
683 Id: proto.Int64(3),
684 XCHANGE_TYPE: proto.String("DELETE"),
685 }
686 b, err = proto.Marshal(removeClarice)
687 if err != nil {
688 t.Fatalf("failed to marshal clarice removal: %v", err)
689 }
690 result, err = updateWriter.AppendRows(ctx, [][]byte{b})
691 if err != nil {
692 t.Fatalf("clarice removal failed (%s): %v", updateWriter.StreamName(), err)
693 }
694 if _, err := result.GetResult(ctx); err != nil {
695 t.Fatalf("result error for clarice removal (%s): %v", updateWriter.StreamName(), err)
696 }
697
698 validateTableConstraints(ctx, t, bqClient, testTable, "after clarice removal",
699 withExactRowCount(int64(len(initialEmployees))-1))
700 }
701
702
703 func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
704 testTable := dataset.Table(tableIDs.New())
705 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
706 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
707 }
708
709 m := &testdata.SimpleMessageProto2{}
710 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
711
712
713 ms, err := mwClient.NewManagedStream(ctx,
714 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
715 WithType(CommittedStream),
716 WithSchemaDescriptor(descriptorProto),
717 )
718 if err != nil {
719 t.Fatalf("NewManagedStream: %v", err)
720 }
721 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
722 withExactRowCount(0))
723
724 data := make([][]byte, len(testSimpleData))
725 for k, mesg := range testSimpleData {
726 b, err := proto.Marshal(mesg)
727 if err != nil {
728 t.Errorf("failed to marshal message %d: %v", k, err)
729 }
730 data[k] = b
731 }
732
733
734 result, err := ms.AppendRows(ctx, data, WithOffset(99))
735 if err != nil {
736 t.Errorf("failed to send append: %v", err)
737 }
738
739 off, err := result.GetResult(ctx)
740 if err == nil {
741 t.Errorf("expected error, got offset %d", off)
742 }
743
744 apiErr, ok := apierror.FromError(err)
745 if !ok {
746 t.Errorf("expected apierror, got %T: %v", err, err)
747 }
748 se := &storagepb.StorageError{}
749 e := apiErr.Details().ExtractProtoMessage(se)
750 if e != nil {
751 t.Errorf("expected storage error, but extraction failed: %v", e)
752 }
753 wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE
754 if se.GetCode() != wantCode {
755 t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
756 }
757
758 result, err = ms.AppendRows(ctx, data, WithOffset(0))
759 if err != nil {
760 t.Errorf("failed to send append: %v", err)
761 }
762 off, err = result.GetResult(ctx)
763 if err != nil {
764 t.Errorf("expected offset, got error %v", err)
765 }
766 wantOffset := int64(0)
767 if off != wantOffset {
768 t.Errorf("offset mismatch, got %d want %d", off, wantOffset)
769 }
770
771 result, err = ms.AppendRows(ctx, data, WithOffset(0))
772 if err != nil {
773 t.Errorf("failed to send append: %v", err)
774 }
775 off, err = result.GetResult(ctx)
776 if err == nil {
777 t.Errorf("expected error, got offset %d", off)
778 }
779 apiErr, ok = apierror.FromError(err)
780 if !ok {
781 t.Errorf("expected apierror, got %T: %v", err, err)
782 }
783 se = &storagepb.StorageError{}
784 e = apiErr.Details().ExtractProtoMessage(se)
785 if e != nil {
786 t.Errorf("expected storage error, but extraction failed: %v", e)
787 }
788 wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS
789 if se.GetCode() != wantCode {
790 t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
791 }
792
793 if _, err := ms.Finalize(ctx); err != nil {
794 t.Errorf("Finalize had error: %v", err)
795 }
796
797 result, err = ms.AppendRows(ctx, data)
798 if err != nil {
799 t.Errorf("failed to send append: %v", err)
800 }
801 off, err = result.GetResult(ctx)
802 if err == nil {
803 t.Errorf("expected error, got offset %d", off)
804 }
805 apiErr, ok = apierror.FromError(err)
806 if !ok {
807 t.Errorf("expected apierror, got %T: %v", err, err)
808 }
809 se = &storagepb.StorageError{}
810 e = apiErr.Details().ExtractProtoMessage(se)
811 if e != nil {
812 t.Errorf("expected storage error, but extraction failed: %v", e)
813 }
814 wantCode = storagepb.StorageError_STREAM_FINALIZED
815 if se.GetCode() != wantCode {
816 t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
817 }
818 }
819
820 func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
821 testTable := dataset.Table(tableIDs.New())
822 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
823 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
824 }
825
826 m := &testdata.SimpleMessageProto2{}
827 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
828
829 ms, err := mwClient.NewManagedStream(ctx,
830 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
831 WithType(PendingStream),
832 WithSchemaDescriptor(descriptorProto),
833 )
834 if err != nil {
835 t.Fatalf("NewManagedStream: %v", err)
836 }
837 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
838 withExactRowCount(0))
839
840
841 var result *AppendResult
842 for k, mesg := range testSimpleData {
843 b, err := proto.Marshal(mesg)
844 if err != nil {
845 t.Errorf("failed to marshal message %d: %v", k, err)
846 }
847 data := [][]byte{b}
848 result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
849 if err != nil {
850 t.Errorf("single-row append %d failed: %v", k, err)
851 }
852
853 off, err := result.GetResult(ctx)
854 if err != nil {
855 t.Errorf("response %d error: %v", k, err)
856 }
857 if off != int64(k) {
858 t.Errorf("offset mismatch, got %d want %d", off, k)
859 }
860 }
861 wantRows := int64(len(testSimpleData))
862
863
864 trackedOffset, err := ms.Finalize(ctx)
865 if err != nil {
866 t.Errorf("Finalize: %v", err)
867 }
868
869 if trackedOffset != wantRows {
870 t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows)
871 }
872
873
874 req := &storagepb.BatchCommitWriteStreamsRequest{
875 Parent: TableParentFromStreamName(ms.StreamName()),
876 WriteStreams: []string{ms.StreamName()},
877 }
878
879 resp, err := mwClient.BatchCommitWriteStreams(ctx, req)
880 if err != nil {
881 t.Errorf("client.BatchCommit: %v", err)
882 }
883 if len(resp.StreamErrors) > 0 {
884 t.Errorf("stream errors present: %v", resp.StreamErrors)
885 }
886 validateTableConstraints(ctx, t, bqClient, testTable, "after send",
887 withExactRowCount(int64(len(testSimpleData))))
888 }
889
890 func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
891 testTable := dataset.Table(tableIDs.New())
892 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
893 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
894 }
895
896 m := &testdata.SimpleMessageProto2{}
897 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
898
899 ms, err := mwClient.NewManagedStream(ctx,
900 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
901 WithType(CommittedStream),
902 WithSchemaDescriptor(descriptorProto),
903 )
904 if err != nil {
905 t.Fatalf("NewManagedStream: %v", err)
906 }
907 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
908 withExactRowCount(0))
909
910
911 var data [][]byte
912 targetSize := 11 * 1024 * 1024
913 b, err := proto.Marshal(testSimpleData[0])
914 if err != nil {
915 t.Errorf("failed to marshal message: %v", err)
916 }
917
918 numRows := targetSize / len(b)
919 data = make([][]byte, numRows)
920
921 for i := 0; i < numRows; i++ {
922 data[i] = b
923 }
924
925 result, err := ms.AppendRows(ctx, data, WithOffset(0))
926 if err != nil {
927 t.Errorf("single append failed: %v", err)
928 }
929 _, err = result.GetResult(ctx)
930 if err != nil {
931 apiErr, ok := apierror.FromError(err)
932 if !ok {
933 t.Errorf("GetResult error was not an instance of ApiError")
934 }
935 status := apiErr.GRPCStatus()
936 if status.Code() != codes.InvalidArgument {
937 t.Errorf("expected InvalidArgument status, got %v", status)
938 }
939 }
940
941 result, err = ms.AppendRows(ctx, [][]byte{b})
942 if err != nil {
943 t.Fatalf("second append failed: %v", err)
944 }
945 _, err = result.GetResult(ctx)
946 if err != nil {
947 t.Errorf("failure result from second append: %v", err)
948 }
949
950 validateTableConstraints(ctx, t, bqClient, testTable, "final",
951 withExactRowCount(1))
952 }
953
954 func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
955 testTable := dataset.Table(tableIDs.New())
956 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
957 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
958 }
959
960 m := &testdata.SimpleMessageProto2{}
961 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
962
963 ms, err := mwClient.NewManagedStream(ctx,
964 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
965 WithType(CommittedStream),
966 WithSchemaDescriptor(descriptorProto),
967 EnableWriteRetries(true),
968 )
969 if err != nil {
970 t.Fatalf("NewManagedStream: %v", err)
971 }
972 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
973 withExactRowCount(0))
974
975
976 var data [][]byte
977 targetSize := 11 * 1024 * 1024
978 b, err := proto.Marshal(testSimpleData[0])
979 if err != nil {
980 t.Errorf("failed to marshal message: %v", err)
981 }
982
983 numRows := targetSize / len(b)
984 data = make([][]byte, numRows)
985
986 for i := 0; i < numRows; i++ {
987 data[i] = b
988 }
989
990 result, err := ms.AppendRows(ctx, data, WithOffset(0))
991 if err != nil {
992 t.Errorf("single append failed: %v", err)
993 }
994 _, err = result.GetResult(ctx)
995 if err != nil {
996 apiErr, ok := apierror.FromError(err)
997 if !ok {
998 t.Errorf("GetResult error was not an instance of ApiError")
999 }
1000 status := apiErr.GRPCStatus()
1001 if status.Code() != codes.InvalidArgument {
1002 t.Errorf("expected InvalidArgument status, got %v", status)
1003 }
1004 }
1005
1006
1007 result, err = ms.AppendRows(ctx, [][]byte{b})
1008 if err != nil {
1009 t.Fatalf("second append expected to succeed, got error: %v", err)
1010 }
1011 _, err = result.GetResult(ctx)
1012 if err != nil {
1013 t.Errorf("failure result from second append: %v", err)
1014 }
1015 if attempts, _ := result.TotalAttempts(ctx); attempts != 1 {
1016 t.Errorf("expected 1 attempts, got %d", attempts)
1017 }
1018 }
1019
1020 func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
1021 testedViews := []*view.View{
1022 AppendRequestsView,
1023 AppendResponsesView,
1024 AppendClientOpenView,
1025 }
1026
1027 if err := view.Register(testedViews...); err != nil {
1028 t.Fatalf("couldn't register views: %v", err)
1029 }
1030
1031 testTable := dataset.Table(tableIDs.New())
1032 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
1033 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
1034 }
1035
1036 m := &testdata.SimpleMessageProto2{}
1037 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
1038
1039
1040 ms, err := mwClient.NewManagedStream(ctx,
1041 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
1042 WithType(DefaultStream),
1043 WithSchemaDescriptor(descriptorProto),
1044 )
1045 if err != nil {
1046 t.Fatalf("NewManagedStream: %v", err)
1047 }
1048
1049 var result *AppendResult
1050 for k, mesg := range testSimpleData {
1051 b, err := proto.Marshal(mesg)
1052 if err != nil {
1053 t.Errorf("failed to marshal message %d: %v", k, err)
1054 }
1055 data := [][]byte{b}
1056 result, err = ms.AppendRows(ctx, data)
1057 if err != nil {
1058 t.Errorf("single-row append %d failed: %v", k, err)
1059 }
1060 }
1061
1062 result.Ready()
1063
1064
1065 time.Sleep(time.Second)
1066
1067
1068 wantTags := map[string][]string{
1069 "cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": {"error"},
1070 "cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil,
1071 "cloud.google.com/go/bigquery/storage/managedwriter/append_requests": {"streamID"},
1072 "cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes": {"streamID"},
1073 "cloud.google.com/go/bigquery/storage/managedwriter/append_request_errors": {"streamID"},
1074 "cloud.google.com/go/bigquery/storage/managedwriter/append_rows": {"streamID"},
1075 }
1076
1077 for _, tv := range testedViews {
1078
1079 metricData, err := func() ([]*view.Row, error) {
1080 attempt := 0
1081 for {
1082 data, err := view.RetrieveData(tv.Name)
1083 attempt = attempt + 1
1084 if attempt > 5 {
1085 return data, err
1086 }
1087 if err == nil && len(data) == 1 {
1088 return data, err
1089 }
1090 time.Sleep(time.Duration(attempt) * 500 * time.Millisecond)
1091 }
1092 }()
1093 if err != nil {
1094 t.Errorf("view %q RetrieveData: %v", tv.Name, err)
1095 }
1096 if mlen := len(metricData); mlen != 1 {
1097 t.Errorf("%q: expected 1 row of metrics, got %d", tv.Name, mlen)
1098 continue
1099 }
1100 if wantKeys, ok := wantTags[tv.Name]; ok {
1101 if wantKeys == nil {
1102 if n := len(tv.TagKeys); n != 0 {
1103 t.Errorf("expected view %q to have no keys, but %d present", tv.Name, n)
1104 }
1105 } else {
1106 for _, wk := range wantKeys {
1107 var found bool
1108 for _, gk := range tv.TagKeys {
1109 if gk.Name() == wk {
1110 found = true
1111 break
1112 }
1113 }
1114 if !found {
1115 t.Errorf("expected view %q to have key %q, but wasn't present", tv.Name, wk)
1116 }
1117 }
1118 }
1119 }
1120 entry := metricData[0].Data
1121 sum, ok := entry.(*view.SumData)
1122 if !ok {
1123 t.Errorf("unexpected metric type: %T", entry)
1124 }
1125 got := sum.Value
1126 var want int64
1127 switch tv {
1128 case AppendRequestsView:
1129 want = int64(len(testSimpleData))
1130 case AppendResponsesView:
1131 want = int64(len(testSimpleData))
1132 case AppendClientOpenView:
1133 want = 1
1134 }
1135
1136
1137 if math.Abs(got-float64(want)) > 0.1 {
1138 t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want)
1139 }
1140 }
1141 }
1142
1143 func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
1144 testTable := dataset.Table(tableIDs.New())
1145 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
1146 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
1147 }
1148
1149 m := &testdata.SimpleMessageProto2{}
1150 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
1151
1152
1153 opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
1154 opts = append(opts, WithSchemaDescriptor(descriptorProto))
1155 ms, err := mwClient.NewManagedStream(ctx, opts...)
1156 if err != nil {
1157 t.Fatalf("NewManagedStream: %v", err)
1158 }
1159 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
1160 withExactRowCount(0))
1161
1162 var result *AppendResult
1163 var curOffset int64
1164 var latestRow []byte
1165 for k, mesg := range testSimpleData {
1166 b, err := proto.Marshal(mesg)
1167 if err != nil {
1168 t.Errorf("failed to marshal message %d: %v", k, err)
1169 }
1170 latestRow = b
1171 data := [][]byte{b}
1172 result, err = ms.AppendRows(ctx, data)
1173 if err != nil {
1174 t.Errorf("single-row append %d failed: %v", k, err)
1175 }
1176 curOffset = curOffset + int64(len(data))
1177 }
1178
1179 _, err = result.GetResult(ctx)
1180 if err != nil {
1181 t.Errorf("error on append: %v", err)
1182 }
1183
1184 validateTableConstraints(ctx, t, bqClient, testTable, "after send",
1185 withExactRowCount(int64(len(testSimpleData))))
1186
1187
1188 _, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "")
1189 if err != nil {
1190 t.Errorf("failed to evolve table schema: %v", err)
1191 }
1192
1193
1194
1195
1196
1197
1198
1199
1200 for {
1201 resp, err := ms.AppendRows(ctx, [][]byte{latestRow})
1202 if err != nil {
1203 t.Errorf("got error on dupe append: %v", err)
1204 break
1205 }
1206 curOffset = curOffset + 1
1207 s, err := resp.UpdatedSchema(ctx)
1208 if err != nil {
1209 t.Errorf("getting schema error: %v", err)
1210 }
1211 if s != nil {
1212 break
1213 }
1214 }
1215
1216
1217 m2 := &testdata.SimpleMessageEvolvedProto2{
1218 Name: proto.String("evolved"),
1219 Value: proto.Int64(180),
1220 Other: proto.String("hello evolution"),
1221 }
1222 descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor())
1223 b, err := proto.Marshal(m2)
1224 if err != nil {
1225 t.Errorf("failed to marshal evolved message: %v", err)
1226 }
1227
1228 res, err := ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
1229 if err != nil {
1230 t.Errorf("failed evolved append: %v", err)
1231 }
1232 _, err = res.GetResult(ctx)
1233 if err != nil {
1234 t.Errorf("error on evolved append: %v", err)
1235 }
1236 curOffset = curOffset + 1
1237
1238
1239
1240 var wg sync.WaitGroup
1241 for i := 0; i < 5; i++ {
1242 id := i
1243 wg.Add(1)
1244 go func() {
1245 res, err := ms.AppendRows(ctx, [][]byte{b})
1246 if err != nil {
1247 t.Errorf("failed concurrent append %d: %v", id, err)
1248 }
1249 _, err = res.GetResult(ctx)
1250 if err != nil {
1251 t.Errorf("error on concurrent append %d: %v", id, err)
1252 }
1253 wg.Done()
1254 }()
1255 }
1256 wg.Wait()
1257
1258 validateTableConstraints(ctx, t, bqClient, testTable, "after evolved records send",
1259 withExactRowCount(int64(curOffset+5)),
1260 withNullCount("name", 0),
1261 withNonNullCount("other", 6),
1262 )
1263 }
1264
1265 func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
1266 testTable := dataset.Table(tableIDs.New())
1267 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil {
1268 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
1269 }
1270
1271 m := &testdata.DefaultValuesPartialSchema{
1272
1273 Id: proto.String("someval"),
1274 }
1275 var data []byte
1276 var err error
1277 if data, err = proto.Marshal(m); err != nil {
1278 t.Fatalf("failed to marshal test row data")
1279 }
1280 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
1281
1282
1283 opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
1284 opts = append(opts, WithSchemaDescriptor(descriptorProto))
1285 ms, err := mwClient.NewManagedStream(ctx, opts...)
1286 if err != nil {
1287 t.Fatalf("NewManagedStream: %v", err)
1288 }
1289 validateTableConstraints(ctx, t, bqClient, testTable, "before send",
1290 withExactRowCount(0))
1291
1292 var result *AppendResult
1293
1294
1295
1296 result, err = ms.AppendRows(ctx, [][]byte{data})
1297 if err != nil {
1298 t.Errorf("append failed: %v", err)
1299 }
1300
1301 _, err = result.GetResult(ctx)
1302 if err != nil {
1303 t.Errorf("error on append: %v", err)
1304 }
1305
1306 validateTableConstraints(ctx, t, bqClient, testTable, "after first row",
1307 withExactRowCount(1),
1308 withNonNullCount("id", 1),
1309 withNullCount("strcol_withdef", 1),
1310 withNullCount("intcol_withdef", 1),
1311 withNullCount("otherstr_withdef", 0))
1312
1313
1314
1315
1316 result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE))
1317 if err != nil {
1318 t.Errorf("append failed: %v", err)
1319 }
1320
1321 _, err = result.GetResult(ctx)
1322 if err != nil {
1323 t.Errorf("error on append: %v", err)
1324 }
1325
1326 validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)",
1327 withExactRowCount(2),
1328 withNullCount("strcol_withdef", 1),
1329 withNullCount("intcol_withdef", 1))
1330
1331
1332 result, err = ms.AppendRows(ctx, [][]byte{data},
1333 UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
1334 "strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE,
1335 }))
1336 if err != nil {
1337 t.Errorf("append failed: %v", err)
1338 }
1339
1340 _, err = result.GetResult(ctx)
1341 if err != nil {
1342 t.Errorf("error on append: %v", err)
1343 }
1344
1345 validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)",
1346 withExactRowCount(3),
1347 withNullCount("strcol_withdef", 2),
1348 withNullCount("intcol_withdef", 1),
1349 withNonNullCount("otherstr_withdef", 3),
1350 withNullCount("otherstr", 3),
1351 withNullCount("strcol", 3),
1352 withNullCount("intcol", 3),
1353 )
1354 }
1355
1356 func TestIntegration_DetectProjectID(t *testing.T) {
1357 ctx := context.Background()
1358 testCreds := testutil.Credentials(ctx)
1359 if testCreds == nil {
1360 t.Skip("test credentials not present, skipping")
1361 }
1362
1363 if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
1364 t.Errorf("test NewClient: %v", err)
1365 }
1366
1367 badTS := testutil.ErroringTokenSource{}
1368
1369 if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
1370 t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID)
1371 }
1372 }
1373
1374 func TestIntegration_ProtoNormalization(t *testing.T) {
1375 mwClient, bqClient := getTestClients(context.Background(), t)
1376 defer mwClient.Close()
1377 defer bqClient.Close()
1378
1379 dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
1380 if err != nil {
1381 t.Fatalf("failed to init test dataset: %v", err)
1382 }
1383 defer cleanup()
1384
1385 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1386 defer cancel()
1387
1388 t.Run("group", func(t *testing.T) {
1389 t.Run("ComplexType", func(t *testing.T) {
1390 t.Parallel()
1391 schema := testdata.ComplexTypeSchema
1392 mesg := &testdata.ComplexType{
1393 NestedRepeatedType: []*testdata.NestedType{
1394 {
1395 InnerType: []*testdata.InnerType{
1396 {Value: []string{"a", "b", "c"}},
1397 {Value: []string{"x", "y", "z"}},
1398 },
1399 },
1400 },
1401 InnerType: &testdata.InnerType{
1402 Value: []string{"top"},
1403 },
1404 RangeType: &testdata.RangeTypeTimestamp{
1405 End: proto.Int64(time.Now().UnixNano()),
1406 },
1407 }
1408 b, err := proto.Marshal(mesg)
1409 if err != nil {
1410 t.Fatalf("proto.Marshal: %v", err)
1411 }
1412 descriptor := (mesg).ProtoReflect().Descriptor()
1413 testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
1414 })
1415 t.Run("WithWellKnownTypes", func(t *testing.T) {
1416 t.Parallel()
1417 schema := testdata.WithWellKnownTypesSchema
1418 mesg := &testdata.WithWellKnownTypes{
1419 Int64Value: proto.Int64(123),
1420 WrappedInt64: &wrapperspb.Int64Value{
1421 Value: 456,
1422 },
1423 StringValue: []string{"a", "b"},
1424 WrappedString: []*wrapperspb.StringValue{
1425 {Value: "foo"},
1426 {Value: "bar"},
1427 },
1428 }
1429 b, err := proto.Marshal(mesg)
1430 if err != nil {
1431 t.Fatalf("proto.Marshal: %v", err)
1432 }
1433 descriptor := (mesg).ProtoReflect().Descriptor()
1434 testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
1435 })
1436 t.Run("WithExternalEnum", func(t *testing.T) {
1437 t.Parallel()
1438 schema := testdata.ExternalEnumMessageSchema
1439 mesg := &testdata.ExternalEnumMessage{
1440 MsgA: &testdata.EnumMsgA{
1441 Foo: proto.String("foo"),
1442 Bar: testdata.ExtEnum_THING.Enum(),
1443 },
1444 MsgB: &testdata.EnumMsgB{
1445 Baz: testdata.ExtEnum_OTHER_THING.Enum(),
1446 },
1447 }
1448 b, err := proto.Marshal(mesg)
1449 if err != nil {
1450 t.Fatalf("proto.Marshal: %v", err)
1451 }
1452 descriptor := (mesg).ProtoReflect().Descriptor()
1453 testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
1454 })
1455 })
1456 }
1457
1458 func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, schema bigquery.Schema, descriptor protoreflect.MessageDescriptor, sampleRow []byte) {
1459 testTable := dataset.Table(tableIDs.New())
1460 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
1461 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
1462 }
1463
1464 dp, err := adapt.NormalizeDescriptor(descriptor)
1465 if err != nil {
1466 t.Fatalf("NormalizeDescriptor: %v", err)
1467 }
1468
1469
1470 ms, err := mwClient.NewManagedStream(ctx,
1471 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
1472 WithType(DefaultStream),
1473 WithSchemaDescriptor(dp),
1474 )
1475 if err != nil {
1476 t.Fatalf("NewManagedStream: %v", err)
1477 }
1478 result, err := ms.AppendRows(ctx, [][]byte{sampleRow})
1479 if err != nil {
1480 t.Errorf("append failed: %v", err)
1481 }
1482
1483 _, err = result.GetResult(ctx)
1484 if err != nil {
1485 t.Errorf("error in response: %v", err)
1486 }
1487 }
1488
1489 func TestIntegration_MultiplexWrites(t *testing.T) {
1490 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1491 defer cancel()
1492 mwClient, bqClient := getTestClients(ctx, t,
1493 WithMultiplexing(),
1494 WithMultiplexPoolLimit(2),
1495 )
1496 defer mwClient.Close()
1497 defer bqClient.Close()
1498
1499 dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, "us-east1")
1500 if err != nil {
1501 t.Fatalf("failed to init test dataset: %v", err)
1502 }
1503 defer cleanup()
1504
1505 wantWrites := 10
1506
1507 testTables := []struct {
1508 tbl *bigquery.Table
1509 schema bigquery.Schema
1510 dp *descriptorpb.DescriptorProto
1511 sampleRow []byte
1512 constraints []constraintOption
1513 }{
1514 {
1515 tbl: dataset.Table(tableIDs.New()),
1516 schema: testdata.SimpleMessageSchema,
1517 dp: func() *descriptorpb.DescriptorProto {
1518 m := &testdata.SimpleMessageProto2{}
1519 dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
1520 return dp
1521 }(),
1522 sampleRow: func() []byte {
1523 msg := &testdata.SimpleMessageProto2{
1524 Name: proto.String("sample_name"),
1525 Value: proto.Int64(1001),
1526 }
1527 b, _ := proto.Marshal(msg)
1528 return b
1529 }(),
1530 constraints: []constraintOption{
1531 withExactRowCount(int64(wantWrites)),
1532 withStringValueCount("name", "sample_name", int64(wantWrites)),
1533 },
1534 },
1535 {
1536 tbl: dataset.Table(tableIDs.New()),
1537 schema: testdata.ValidationBaseSchema,
1538 dp: func() *descriptorpb.DescriptorProto {
1539 m := &testdata.ValidationP2Optional{}
1540 dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
1541 return dp
1542 }(),
1543 sampleRow: func() []byte {
1544 msg := &testdata.ValidationP2Optional{
1545 Int64Field: proto.Int64(69),
1546 StringField: proto.String("validation_string"),
1547 }
1548 b, _ := proto.Marshal(msg)
1549 return b
1550 }(),
1551 constraints: []constraintOption{
1552 withExactRowCount(int64(wantWrites)),
1553 withStringValueCount("string_field", "validation_string", int64(wantWrites)),
1554 },
1555 },
1556 {
1557 tbl: dataset.Table(tableIDs.New()),
1558 schema: testdata.GithubArchiveSchema,
1559 dp: func() *descriptorpb.DescriptorProto {
1560 m := &testdata.GithubArchiveMessageProto2{}
1561 dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
1562 return dp
1563 }(),
1564 sampleRow: func() []byte {
1565 msg := &testdata.GithubArchiveMessageProto2{
1566 Payload: proto.String("payload_string"),
1567 Id: proto.String("some_id"),
1568 }
1569 b, _ := proto.Marshal(msg)
1570 return b
1571 }(),
1572 constraints: []constraintOption{
1573 withExactRowCount(int64(wantWrites)),
1574 withStringValueCount("payload", "payload_string", int64(wantWrites)),
1575 },
1576 },
1577 }
1578
1579
1580 for _, testTable := range testTables {
1581 if err := testTable.tbl.Create(ctx, &bigquery.TableMetadata{Schema: testTable.schema}); err != nil {
1582 t.Fatalf("failed to create test table %q: %v", testTable.tbl.FullyQualifiedName(), err)
1583 }
1584 }
1585
1586 var gotFirstPool *connectionPool
1587 var results []*AppendResult
1588 for i := 0; i < wantWrites; i++ {
1589 for k, testTable := range testTables {
1590
1591 ms, err := mwClient.NewManagedStream(ctx,
1592 WithDestinationTable(TableParentFromParts(testTable.tbl.ProjectID, testTable.tbl.DatasetID, testTable.tbl.TableID)),
1593 WithType(DefaultStream),
1594 WithSchemaDescriptor(testTable.dp),
1595 EnableWriteRetries(true),
1596 )
1597 if err != nil {
1598 t.Fatalf("NewManagedStream %d: %v", k, err)
1599 }
1600 if i == 0 && k == 0 {
1601 if ms.pool == nil {
1602 t.Errorf("expected a non-nil pool reference for first writer")
1603 }
1604 gotFirstPool = ms.pool
1605 } else {
1606 if ms.pool != gotFirstPool {
1607 t.Errorf("expected same pool reference, got a different pool")
1608 }
1609 }
1610 defer ms.Close()
1611 if err != nil {
1612 t.Fatalf("failed to create ManagedStream for table %d on iteration %d: %v", k, i, err)
1613 }
1614 res, err := ms.AppendRows(ctx, [][]byte{testTable.sampleRow})
1615 if err != nil {
1616 t.Fatalf("failed to append to table %d on iteration %d: %v", k, i, err)
1617 }
1618 results = append(results, res)
1619 }
1620 }
1621
1622
1623 for k, res := range results {
1624 if _, err := res.GetResult(ctx); err != nil {
1625 t.Errorf("result %d yielded error: %v", k, err)
1626 }
1627 }
1628
1629
1630 for _, testTable := range testTables {
1631 validateTableConstraints(ctx, t, bqClient, testTable.tbl, "", testTable.constraints...)
1632 }
1633
1634 }
1635
1636 func TestIntegration_MingledContexts(t *testing.T) {
1637 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1638 defer cancel()
1639 mwClient, bqClient := getTestClients(ctx, t,
1640 WithMultiplexing(),
1641 WithMultiplexPoolLimit(2),
1642 )
1643 defer mwClient.Close()
1644 defer bqClient.Close()
1645
1646 wantLocation := "us-east4"
1647
1648 dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation)
1649 if err != nil {
1650 t.Fatalf("failed to init test dataset: %v", err)
1651 }
1652 defer cleanup()
1653
1654 testTable := dataset.Table(tableIDs.New())
1655 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
1656 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
1657 }
1658
1659 m := &testdata.SimpleMessageProto2{}
1660 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
1661
1662 numWriters := 4
1663 contexts := make([]context.Context, numWriters)
1664 cancels := make([]context.CancelFunc, numWriters)
1665 writers := make([]*ManagedStream, numWriters)
1666 for i := 0; i < numWriters; i++ {
1667 contexts[i], cancels[i] = context.WithCancel(ctx)
1668 ms, err := mwClient.NewManagedStream(contexts[i],
1669 WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
1670 WithType(DefaultStream),
1671 WithSchemaDescriptor(descriptorProto),
1672 )
1673 if err != nil {
1674 t.Fatalf("instantating writer %d failed: %v", i, err)
1675 }
1676 writers[i] = ms
1677 }
1678
1679 sampleRow, err := proto.Marshal(&testdata.SimpleMessageProto2{
1680 Name: proto.String("datafield"),
1681 Value: proto.Int64(1234),
1682 })
1683 if err != nil {
1684 t.Fatalf("failed to generate sample row")
1685 }
1686
1687 for i := 0; i < numWriters; i++ {
1688 res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
1689 if err != nil {
1690 t.Errorf("initial write on %d failed: %v", i, err)
1691 } else {
1692 if _, err := res.GetResult(contexts[i]); err != nil {
1693 t.Errorf("GetResult initial write %d: %v", i, err)
1694 }
1695 }
1696 }
1697
1698
1699 cancels[0]()
1700
1701 for i := 1; i < numWriters; i++ {
1702 res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
1703 if err != nil {
1704 t.Errorf("second write on %d failed: %v", i, err)
1705 } else {
1706 if _, err := res.GetResult(contexts[1]); err != nil {
1707 t.Errorf("GetResult err on second write %d: %v", i, err)
1708 }
1709 }
1710 }
1711
1712
1713 if _, err := writers[0].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil {
1714 t.Errorf("write succeeded on first writer when it should have failed")
1715 }
1716
1717
1718 cancels[1]()
1719 if _, err := writers[2].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil {
1720 t.Errorf("write succeeded on third writer with a bad request context")
1721 }
1722
1723
1724 for i := 2; i < numWriters; i++ {
1725 res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
1726 if err != nil {
1727 t.Errorf("second write on %d failed: %v", i, err)
1728 } else {
1729 if _, err := res.GetResult(contexts[i]); err != nil {
1730 t.Errorf("GetResult err on second write %d: %v", i, err)
1731 }
1732 }
1733 }
1734 }
1735
View as plain text