...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/integration_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"math"
    21  	"sync"
    22  	"testing"
    23  	"time"
    24  
    25  	"cloud.google.com/go/bigquery"
    26  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    27  	"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
    28  	"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
    29  	"cloud.google.com/go/internal/testutil"
    30  	"cloud.google.com/go/internal/uid"
    31  	"github.com/google/go-cmp/cmp"
    32  	"github.com/googleapis/gax-go/v2/apierror"
    33  	"go.opencensus.io/stats/view"
    34  	"google.golang.org/api/option"
    35  	"google.golang.org/grpc/codes"
    36  	"google.golang.org/protobuf/encoding/protojson"
    37  	"google.golang.org/protobuf/proto"
    38  	"google.golang.org/protobuf/reflect/protodesc"
    39  	"google.golang.org/protobuf/reflect/protoreflect"
    40  	"google.golang.org/protobuf/testing/protocmp"
    41  	"google.golang.org/protobuf/types/descriptorpb"
    42  	"google.golang.org/protobuf/types/dynamicpb"
    43  	"google.golang.org/protobuf/types/known/wrapperspb"
    44  )
    45  
    46  var (
    47  	datasetIDs         = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
    48  	tableIDs           = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
    49  	defaultTestTimeout = 90 * time.Second
    50  )
    51  
    52  // our test data has cardinality 5 for names, 3 for values
    53  var testSimpleData = []*testdata.SimpleMessageProto2{
    54  	{Name: proto.String("one"), Value: proto.Int64(1)},
    55  	{Name: proto.String("two"), Value: proto.Int64(2)},
    56  	{Name: proto.String("three"), Value: proto.Int64(3)},
    57  	{Name: proto.String("four"), Value: proto.Int64(1)},
    58  	{Name: proto.String("five"), Value: proto.Int64(2)},
    59  }
    60  
    61  func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
    62  	if testing.Short() {
    63  		t.Skip("Integration tests skipped in short mode")
    64  	}
    65  	projID := testutil.ProjID()
    66  	if projID == "" {
    67  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
    68  	}
    69  	ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery")
    70  	if ts == nil {
    71  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
    72  	}
    73  	opts = append(opts, option.WithTokenSource(ts))
    74  	client, err := NewClient(ctx, projID, opts...)
    75  	if err != nil {
    76  		t.Fatalf("couldn't create managedwriter client: %v", err)
    77  	}
    78  
    79  	bqClient, err := bigquery.NewClient(ctx, projID, opts...)
    80  	if err != nil {
    81  		t.Fatalf("couldn't create bigquery client: %v", err)
    82  	}
    83  	return client, bqClient
    84  }
    85  
    86  // setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
    87  func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) {
    88  	dataset := bqc.Dataset(datasetIDs.New())
    89  	if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil {
    90  		return nil, nil, err
    91  	}
    92  	return dataset, func() {
    93  		if err := dataset.DeleteWithContents(ctx); err != nil {
    94  			t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err)
    95  		}
    96  	}, nil
    97  }
    98  
    99  // setupDynamicDescriptors aids testing when not using a supplied proto
   100  func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
   101  	convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
   102  	if err != nil {
   103  		t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
   104  	}
   105  
   106  	descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
   107  	if err != nil {
   108  		t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
   109  	}
   110  	messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
   111  	if !ok {
   112  		t.Fatalf("adapted descriptor is not a message descriptor")
   113  	}
   114  	dp, err := adapt.NormalizeDescriptor(messageDescriptor)
   115  	if err != nil {
   116  		t.Fatalf("NormalizeDescriptor: %v", err)
   117  	}
   118  	return messageDescriptor, dp
   119  }
   120  
   121  func TestIntegration_ClientGetWriteStream(t *testing.T) {
   122  	ctx := context.Background()
   123  	mwClient, bqClient := getTestClients(ctx, t)
   124  	defer mwClient.Close()
   125  	defer bqClient.Close()
   126  
   127  	wantLocation := "us-east1"
   128  	dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation)
   129  	if err != nil {
   130  		t.Fatalf("failed to init test dataset: %v", err)
   131  	}
   132  	defer cleanup()
   133  
   134  	testTable := dataset.Table(tableIDs.New())
   135  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   136  		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
   137  	}
   138  
   139  	apiSchema, _ := adapt.BQSchemaToStorageTableSchema(testdata.SimpleMessageSchema)
   140  	parent := TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)
   141  	explicitStream, err := mwClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
   142  		Parent: parent,
   143  		WriteStream: &storagepb.WriteStream{
   144  			Type: storagepb.WriteStream_PENDING,
   145  		},
   146  	})
   147  	if err != nil {
   148  		t.Fatalf("CreateWriteStream: %v", err)
   149  	}
   150  
   151  	testCases := []struct {
   152  		description string
   153  		isDefault   bool
   154  		streamID    string
   155  		wantType    storagepb.WriteStream_Type
   156  	}{
   157  		{
   158  			description: "default",
   159  			isDefault:   true,
   160  			streamID:    fmt.Sprintf("%s/streams/_default", parent),
   161  			wantType:    storagepb.WriteStream_COMMITTED,
   162  		},
   163  		{
   164  			description: "explicit pending",
   165  			streamID:    explicitStream.Name,
   166  			wantType:    storagepb.WriteStream_PENDING,
   167  		},
   168  	}
   169  
   170  	for _, tc := range testCases {
   171  		for _, fullView := range []bool{false, true} {
   172  			info, err := mwClient.getWriteStream(ctx, tc.streamID, fullView)
   173  			if err != nil {
   174  				t.Errorf("%s (%T): getWriteStream failed: %v", tc.description, fullView, err)
   175  			}
   176  			if info.GetType() != tc.wantType {
   177  				t.Errorf("%s (%T): got type %d, want type %d", tc.description, fullView, info.GetType(), tc.wantType)
   178  			}
   179  			if info.GetLocation() != wantLocation {
   180  				t.Errorf("%s (%T) view: got location %s, want location %s", tc.description, fullView, info.GetLocation(), wantLocation)
   181  			}
   182  			if info.GetCommitTime() != nil {
   183  				t.Errorf("%s (%T)expected empty commit time, got %v", tc.description, fullView, info.GetCommitTime())
   184  			}
   185  
   186  			if !tc.isDefault {
   187  				if info.GetCreateTime() == nil {
   188  					t.Errorf("%s (%T): expected create time, was empty", tc.description, fullView)
   189  				}
   190  			} else {
   191  				if info.GetCreateTime() != nil {
   192  					t.Errorf("%s (%T): expected empty time, got %v", tc.description, fullView, info.GetCreateTime())
   193  				}
   194  			}
   195  
   196  			if !fullView {
   197  				if info.GetTableSchema() != nil {
   198  					t.Errorf("%s (%T) basic view: expected no schema, was populated", tc.description, fullView)
   199  				}
   200  			} else {
   201  				if diff := cmp.Diff(info.GetTableSchema(), apiSchema, protocmp.Transform()); diff != "" {
   202  					t.Errorf("%s (%T) schema mismatch: -got, +want:\n%s", tc.description, fullView, diff)
   203  				}
   204  			}
   205  		}
   206  	}
   207  }
   208  
   209  func TestIntegration_ManagedWriter(t *testing.T) {
   210  	mwClient, bqClient := getTestClients(context.Background(), t)
   211  	defer mwClient.Close()
   212  	defer bqClient.Close()
   213  
   214  	dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
   215  	if err != nil {
   216  		t.Fatalf("failed to init test dataset: %v", err)
   217  	}
   218  	defer cleanup()
   219  
   220  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   221  	defer cancel()
   222  
   223  	t.Run("group", func(t *testing.T) {
   224  		t.Run("DefaultStream", func(t *testing.T) {
   225  			t.Parallel()
   226  			testDefaultStream(ctx, t, mwClient, bqClient, dataset)
   227  		})
   228  		t.Run("DefaultStreamDynamicJSON", func(t *testing.T) {
   229  			t.Parallel()
   230  			testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
   231  		})
   232  		t.Run("CommittedStream", func(t *testing.T) {
   233  			t.Parallel()
   234  			testCommittedStream(ctx, t, mwClient, bqClient, dataset)
   235  		})
   236  		t.Run("ErrorBehaviors", func(t *testing.T) {
   237  			t.Parallel()
   238  			testErrorBehaviors(ctx, t, mwClient, bqClient, dataset)
   239  		})
   240  		t.Run("BufferedStream", func(t *testing.T) {
   241  			t.Parallel()
   242  			testBufferedStream(ctx, t, mwClient, bqClient, dataset)
   243  		})
   244  		t.Run("PendingStream", func(t *testing.T) {
   245  			t.Parallel()
   246  			testPendingStream(ctx, t, mwClient, bqClient, dataset)
   247  		})
   248  		t.Run("SimpleCDC", func(t *testing.T) {
   249  			t.Parallel()
   250  			testSimpleCDC(ctx, t, mwClient, bqClient, dataset)
   251  		})
   252  		t.Run("Instrumentation", func(t *testing.T) {
   253  			// Don't run this in parallel, we only want to collect stats from this subtest.
   254  			testInstrumentation(ctx, t, mwClient, bqClient, dataset)
   255  		})
   256  		t.Run("TestLargeInsertNoRetry", func(t *testing.T) {
   257  			testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset)
   258  		})
   259  		t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
   260  			testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
   261  		})
   262  		t.Run("DefaultValueHandling", func(t *testing.T) {
   263  			testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset)
   264  		})
   265  	})
   266  }
   267  
   268  func TestIntegration_SchemaEvolution(t *testing.T) {
   269  
   270  	testcases := []struct {
   271  		desc       string
   272  		clientOpts []option.ClientOption
   273  		writerOpts []WriterOption
   274  	}{
   275  		{
   276  			desc: "Simplex_Committed",
   277  			writerOpts: []WriterOption{
   278  				WithType(CommittedStream),
   279  			},
   280  		},
   281  		{
   282  			desc: "Simplex_Default",
   283  			writerOpts: []WriterOption{
   284  				WithType(DefaultStream),
   285  			},
   286  		},
   287  		{
   288  			desc: "Multiplex_Default",
   289  			clientOpts: []option.ClientOption{
   290  				WithMultiplexing(),
   291  				WithMultiplexPoolLimit(2),
   292  			},
   293  			writerOpts: []WriterOption{
   294  				WithType(DefaultStream),
   295  			},
   296  		},
   297  	}
   298  
   299  	for _, tc := range testcases {
   300  		mwClient, bqClient := getTestClients(context.Background(), t, tc.clientOpts...)
   301  		defer mwClient.Close()
   302  		defer bqClient.Close()
   303  
   304  		dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
   305  		if err != nil {
   306  			t.Fatalf("failed to init test dataset: %v", err)
   307  		}
   308  		defer cleanup()
   309  
   310  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   311  		defer cancel()
   312  		t.Run(tc.desc, func(t *testing.T) {
   313  			testSchemaEvolution(ctx, t, mwClient, bqClient, dataset, tc.writerOpts...)
   314  		})
   315  	}
   316  }
   317  
   318  func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   319  	testTable := dataset.Table(tableIDs.New())
   320  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   321  		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
   322  	}
   323  	// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
   324  	// to send as the stream's schema.
   325  	m := &testdata.SimpleMessageProto2{}
   326  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
   327  
   328  	// setup a new stream.
   329  	ms, err := mwClient.NewManagedStream(ctx,
   330  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   331  		WithType(DefaultStream),
   332  		WithSchemaDescriptor(descriptorProto),
   333  	)
   334  	if err != nil {
   335  		t.Fatalf("NewManagedStream: %v", err)
   336  	}
   337  	if ms.id == "" {
   338  		t.Errorf("managed stream is missing ID")
   339  	}
   340  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   341  		withExactRowCount(0))
   342  
   343  	// First, send the test rows individually.
   344  	var result *AppendResult
   345  	for k, mesg := range testSimpleData {
   346  		b, err := proto.Marshal(mesg)
   347  		if err != nil {
   348  			t.Errorf("failed to marshal message %d: %v", k, err)
   349  		}
   350  		data := [][]byte{b}
   351  		result, err = ms.AppendRows(ctx, data)
   352  		if err != nil {
   353  			t.Errorf("single-row append %d failed: %v", k, err)
   354  		}
   355  	}
   356  	// Wait for the result to indicate ready, then validate.
   357  	o, err := result.GetResult(ctx)
   358  	if err != nil {
   359  		t.Errorf("result error for last send: %v", err)
   360  	}
   361  	if o != NoStreamOffset {
   362  		t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
   363  	}
   364  	validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
   365  		withExactRowCount(int64(len(testSimpleData))),
   366  		withDistinctValues("name", int64(len(testSimpleData))))
   367  
   368  	// Now, send the test rows grouped into in a single append.
   369  	var data [][]byte
   370  	for k, mesg := range testSimpleData {
   371  		b, err := proto.Marshal(mesg)
   372  		if err != nil {
   373  			t.Errorf("failed to marshal message %d: %v", k, err)
   374  		}
   375  		data = append(data, b)
   376  	}
   377  	result, err = ms.AppendRows(ctx, data)
   378  	if err != nil {
   379  		t.Errorf("grouped-row append failed: %v", err)
   380  	}
   381  	// Wait for the result to indicate ready, then validate again.  Our total rows have increased, but
   382  	// cardinality should not.
   383  	o, err = result.GetResult(ctx)
   384  	if err != nil {
   385  		t.Errorf("result error for last send: %v", err)
   386  	}
   387  	if o != NoStreamOffset {
   388  		t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
   389  	}
   390  	validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
   391  		withExactRowCount(int64(2*len(testSimpleData))),
   392  		withDistinctValues("name", int64(len(testSimpleData))),
   393  		withDistinctValues("value", int64(3)),
   394  	)
   395  }
   396  
   397  func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   398  	testTable := dataset.Table(tableIDs.New())
   399  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.GithubArchiveSchema}); err != nil {
   400  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   401  	}
   402  
   403  	md, descriptorProto := setupDynamicDescriptors(t, testdata.GithubArchiveSchema)
   404  
   405  	ms, err := mwClient.NewManagedStream(ctx,
   406  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   407  		WithType(DefaultStream),
   408  		WithSchemaDescriptor(descriptorProto),
   409  	)
   410  	if err != nil {
   411  		t.Fatalf("NewManagedStream: %v", err)
   412  	}
   413  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   414  		withExactRowCount(0))
   415  
   416  	sampleJSONData := [][]byte{
   417  		[]byte(`{"type": "foo", "public": true, "repo": {"id": 99, "name": "repo_name_1", "url": "https://one.example.com"}}`),
   418  		[]byte(`{"type": "bar", "public": false, "repo": {"id": 101, "name": "repo_name_2", "url": "https://two.example.com"}}`),
   419  		[]byte(`{"type": "baz", "public": true, "repo": {"id": 456, "name": "repo_name_3", "url": "https://three.example.com"}}`),
   420  		[]byte(`{"type": "wow", "public": false, "repo": {"id": 123, "name": "repo_name_4", "url": "https://four.example.com"}}`),
   421  		[]byte(`{"type": "yay", "public": true, "repo": {"name": "repo_name_5", "url": "https://five.example.com"}}`),
   422  	}
   423  
   424  	var result *AppendResult
   425  	for k, v := range sampleJSONData {
   426  		message := dynamicpb.NewMessage(md)
   427  
   428  		// First, json->proto message
   429  		err = protojson.Unmarshal(v, message)
   430  		if err != nil {
   431  			t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
   432  		}
   433  		// Then, proto message -> bytes.
   434  		b, err := proto.Marshal(message)
   435  		if err != nil {
   436  			t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
   437  		}
   438  		result, err = ms.AppendRows(ctx, [][]byte{b})
   439  		if err != nil {
   440  			t.Errorf("single-row append %d failed: %v", k, err)
   441  		}
   442  	}
   443  
   444  	// Wait for the result to indicate ready, then validate.
   445  	o, err := result.GetResult(ctx)
   446  	if err != nil {
   447  		t.Errorf("result error for last send: %v", err)
   448  	}
   449  	if o != NoStreamOffset {
   450  		t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
   451  	}
   452  	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
   453  		withExactRowCount(int64(len(sampleJSONData))),
   454  		withDistinctValues("type", int64(len(sampleJSONData))),
   455  		withDistinctValues("public", int64(2)))
   456  }
   457  
   458  func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   459  	testTable := dataset.Table(tableIDs.New())
   460  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   461  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   462  	}
   463  
   464  	m := &testdata.SimpleMessageProto2{}
   465  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
   466  
   467  	ms, err := mwClient.NewManagedStream(ctx,
   468  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   469  		WithType(BufferedStream),
   470  		WithSchemaDescriptor(descriptorProto),
   471  	)
   472  	if err != nil {
   473  		t.Fatalf("NewManagedStream: %v", err)
   474  	}
   475  
   476  	info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID, false)
   477  	if err != nil {
   478  		t.Errorf("couldn't get stream info: %v", err)
   479  	}
   480  	if info.GetType().String() != string(ms.StreamType()) {
   481  		t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
   482  	}
   483  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   484  		withExactRowCount(0))
   485  
   486  	var expectedRows int64
   487  	for k, mesg := range testSimpleData {
   488  		b, err := proto.Marshal(mesg)
   489  		if err != nil {
   490  			t.Fatalf("failed to marshal message %d: %v", k, err)
   491  		}
   492  		data := [][]byte{b}
   493  		results, err := ms.AppendRows(ctx, data)
   494  		if err != nil {
   495  			t.Fatalf("single-row append %d failed: %v", k, err)
   496  		}
   497  		// Wait for acknowledgement.
   498  		offset, err := results.GetResult(ctx)
   499  		if err != nil {
   500  			t.Fatalf("got error from pending result %d: %v", k, err)
   501  		}
   502  		validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k),
   503  			withExactRowCount(expectedRows),
   504  			withDistinctValues("name", expectedRows))
   505  
   506  		// move offset and re-validate.
   507  		flushOffset, err := ms.FlushRows(ctx, offset)
   508  		if err != nil {
   509  			t.Errorf("failed to flush offset to %d: %v", offset, err)
   510  		}
   511  		expectedRows = flushOffset + 1
   512  		validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k),
   513  			withExactRowCount(expectedRows),
   514  			withDistinctValues("name", expectedRows))
   515  	}
   516  }
   517  
   518  func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   519  	testTable := dataset.Table(tableIDs.New())
   520  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   521  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   522  	}
   523  
   524  	m := &testdata.SimpleMessageProto2{}
   525  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
   526  
   527  	// setup a new stream.
   528  	ms, err := mwClient.NewManagedStream(ctx,
   529  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   530  		WithType(CommittedStream),
   531  		WithSchemaDescriptor(descriptorProto),
   532  	)
   533  	if err != nil {
   534  		t.Fatalf("NewManagedStream: %v", err)
   535  	}
   536  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   537  		withExactRowCount(0))
   538  
   539  	var result *AppendResult
   540  	for k, mesg := range testSimpleData {
   541  		b, err := proto.Marshal(mesg)
   542  		if err != nil {
   543  			t.Errorf("failed to marshal message %d: %v", k, err)
   544  		}
   545  		data := [][]byte{b}
   546  		result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
   547  		if err != nil {
   548  			t.Errorf("single-row append %d failed: %v", k, err)
   549  		}
   550  	}
   551  	// Wait for the result to indicate ready, then validate.
   552  	o, err := result.GetResult(ctx)
   553  	if err != nil {
   554  		t.Errorf("result error for last send: %v", err)
   555  	}
   556  	wantOffset := int64(len(testSimpleData) - 1)
   557  	if o != wantOffset {
   558  		t.Errorf("offset mismatch, got %d want %d", o, wantOffset)
   559  	}
   560  	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
   561  		withExactRowCount(int64(len(testSimpleData))))
   562  }
   563  
   564  // testSimpleCDC demonstrates basic Change Data Capture (CDC) functionality.   We add an initial set of
   565  // rows to a table, then use CDC to apply updates.
   566  func testSimpleCDC(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   567  	testTable := dataset.Table(tableIDs.New())
   568  
   569  	if err := testTable.Create(ctx, &bigquery.TableMetadata{
   570  		Schema: testdata.ExampleEmployeeSchema,
   571  		Clustering: &bigquery.Clustering{
   572  			Fields: []string{"id"},
   573  		},
   574  	}); err != nil {
   575  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   576  	}
   577  
   578  	// Mark the primary key using an ALTER TABLE DDL.
   579  	tableIdentifier, _ := testTable.Identifier(bigquery.StandardSQLID)
   580  	sql := fmt.Sprintf("ALTER TABLE %s ADD PRIMARY KEY(id) NOT ENFORCED;", tableIdentifier)
   581  	if _, err := bqClient.Query(sql).Read(ctx); err != nil {
   582  		t.Fatalf("failed ALTER TABLE: %v", err)
   583  	}
   584  
   585  	m := &testdata.ExampleEmployeeCDC{}
   586  	descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
   587  	if err != nil {
   588  		t.Fatalf("NormalizeDescriptor: %v", err)
   589  	}
   590  
   591  	// Setup an initial writer for sending initial inserts.
   592  	writer, err := mwClient.NewManagedStream(ctx,
   593  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   594  		WithType(CommittedStream),
   595  		WithSchemaDescriptor(descriptorProto),
   596  	)
   597  	if err != nil {
   598  		t.Fatalf("NewManagedStream: %v", err)
   599  	}
   600  	defer writer.Close()
   601  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   602  		withExactRowCount(0))
   603  
   604  	initialEmployees := []*testdata.ExampleEmployeeCDC{
   605  		{
   606  			Id:           proto.Int64(1),
   607  			Username:     proto.String("alice"),
   608  			GivenName:    proto.String("Alice CEO"),
   609  			Departments:  []string{"product", "support", "internal"},
   610  			Salary:       proto.Int64(1),
   611  			XCHANGE_TYPE: proto.String("INSERT"),
   612  		},
   613  		{
   614  			Id:           proto.Int64(2),
   615  			Username:     proto.String("bob"),
   616  			GivenName:    proto.String("Bob Bobberson"),
   617  			Departments:  []string{"research"},
   618  			Salary:       proto.Int64(100000),
   619  			XCHANGE_TYPE: proto.String("INSERT"),
   620  		},
   621  		{
   622  			Id:           proto.Int64(3),
   623  			Username:     proto.String("clarice"),
   624  			GivenName:    proto.String("Clarice Clearwater"),
   625  			Departments:  []string{"product"},
   626  			Salary:       proto.Int64(100001),
   627  			XCHANGE_TYPE: proto.String("INSERT"),
   628  		},
   629  	}
   630  
   631  	// First append inserts all the initial employees.
   632  	data := make([][]byte, len(initialEmployees))
   633  	for k, mesg := range initialEmployees {
   634  		b, err := proto.Marshal(mesg)
   635  		if err != nil {
   636  			t.Fatalf("failed to marshal record %d: %v", k, err)
   637  		}
   638  		data[k] = b
   639  	}
   640  	result, err := writer.AppendRows(ctx, data)
   641  	if err != nil {
   642  		t.Errorf("initial insert failed (%s): %v", writer.StreamName(), err)
   643  	}
   644  	if _, err := result.GetResult(ctx); err != nil {
   645  		t.Errorf("result error for initial insert (%s): %v", writer.StreamName(), err)
   646  	}
   647  	validateTableConstraints(ctx, t, bqClient, testTable, "initial inserts",
   648  		withExactRowCount(int64(len(initialEmployees))))
   649  
   650  	// Create a second writer for applying modifications.
   651  	updateWriter, err := mwClient.NewManagedStream(ctx,
   652  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   653  		WithType(DefaultStream),
   654  		WithSchemaDescriptor(descriptorProto),
   655  	)
   656  	if err != nil {
   657  		t.Fatalf("NewManagedStream: %v", err)
   658  	}
   659  	defer updateWriter.Close()
   660  
   661  	// Change bob via an UPSERT CDC
   662  	newBob := proto.Clone(initialEmployees[1]).(*testdata.ExampleEmployeeCDC)
   663  	newBob.Salary = proto.Int64(105000)
   664  	newBob.Departments = []string{"research", "product"}
   665  	newBob.XCHANGE_TYPE = proto.String("UPSERT")
   666  	b, err := proto.Marshal(newBob)
   667  	if err != nil {
   668  		t.Fatalf("failed to marshal new bob: %v", err)
   669  	}
   670  	result, err = updateWriter.AppendRows(ctx, [][]byte{b})
   671  	if err != nil {
   672  		t.Fatalf("bob modification failed (%s): %v", updateWriter.StreamName(), err)
   673  	}
   674  	if _, err := result.GetResult(ctx); err != nil {
   675  		t.Fatalf("result error for bob modification (%s): %v", updateWriter.StreamName(), err)
   676  	}
   677  	validateTableConstraints(ctx, t, bqClient, testTable, "after bob modification",
   678  		withExactRowCount(int64(len(initialEmployees))),
   679  		withDistinctValues("id", int64(len(initialEmployees))))
   680  
   681  	// remote clarice via DELETE CDC
   682  	removeClarice := &testdata.ExampleEmployeeCDC{
   683  		Id:           proto.Int64(3),
   684  		XCHANGE_TYPE: proto.String("DELETE"),
   685  	}
   686  	b, err = proto.Marshal(removeClarice)
   687  	if err != nil {
   688  		t.Fatalf("failed to marshal clarice removal: %v", err)
   689  	}
   690  	result, err = updateWriter.AppendRows(ctx, [][]byte{b})
   691  	if err != nil {
   692  		t.Fatalf("clarice removal failed (%s): %v", updateWriter.StreamName(), err)
   693  	}
   694  	if _, err := result.GetResult(ctx); err != nil {
   695  		t.Fatalf("result error for clarice removal (%s): %v", updateWriter.StreamName(), err)
   696  	}
   697  
   698  	validateTableConstraints(ctx, t, bqClient, testTable, "after clarice removal",
   699  		withExactRowCount(int64(len(initialEmployees))-1))
   700  }
   701  
   702  // testErrorBehaviors intentionally issues problematic requests to verify error behaviors.
   703  func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   704  	testTable := dataset.Table(tableIDs.New())
   705  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   706  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   707  	}
   708  
   709  	m := &testdata.SimpleMessageProto2{}
   710  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
   711  
   712  	// setup a new stream.
   713  	ms, err := mwClient.NewManagedStream(ctx,
   714  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   715  		WithType(CommittedStream),
   716  		WithSchemaDescriptor(descriptorProto),
   717  	)
   718  	if err != nil {
   719  		t.Fatalf("NewManagedStream: %v", err)
   720  	}
   721  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   722  		withExactRowCount(0))
   723  
   724  	data := make([][]byte, len(testSimpleData))
   725  	for k, mesg := range testSimpleData {
   726  		b, err := proto.Marshal(mesg)
   727  		if err != nil {
   728  			t.Errorf("failed to marshal message %d: %v", k, err)
   729  		}
   730  		data[k] = b
   731  	}
   732  
   733  	// Send an append at an invalid offset.
   734  	result, err := ms.AppendRows(ctx, data, WithOffset(99))
   735  	if err != nil {
   736  		t.Errorf("failed to send append: %v", err)
   737  	}
   738  	//
   739  	off, err := result.GetResult(ctx)
   740  	if err == nil {
   741  		t.Errorf("expected error, got offset %d", off)
   742  	}
   743  
   744  	apiErr, ok := apierror.FromError(err)
   745  	if !ok {
   746  		t.Errorf("expected apierror, got %T: %v", err, err)
   747  	}
   748  	se := &storagepb.StorageError{}
   749  	e := apiErr.Details().ExtractProtoMessage(se)
   750  	if e != nil {
   751  		t.Errorf("expected storage error, but extraction failed: %v", e)
   752  	}
   753  	wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE
   754  	if se.GetCode() != wantCode {
   755  		t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
   756  	}
   757  	// Send "real" append to advance the offset.
   758  	result, err = ms.AppendRows(ctx, data, WithOffset(0))
   759  	if err != nil {
   760  		t.Errorf("failed to send append: %v", err)
   761  	}
   762  	off, err = result.GetResult(ctx)
   763  	if err != nil {
   764  		t.Errorf("expected offset, got error %v", err)
   765  	}
   766  	wantOffset := int64(0)
   767  	if off != wantOffset {
   768  		t.Errorf("offset mismatch, got %d want %d", off, wantOffset)
   769  	}
   770  	// Now, send at the start offset again.
   771  	result, err = ms.AppendRows(ctx, data, WithOffset(0))
   772  	if err != nil {
   773  		t.Errorf("failed to send append: %v", err)
   774  	}
   775  	off, err = result.GetResult(ctx)
   776  	if err == nil {
   777  		t.Errorf("expected error, got offset %d", off)
   778  	}
   779  	apiErr, ok = apierror.FromError(err)
   780  	if !ok {
   781  		t.Errorf("expected apierror, got %T: %v", err, err)
   782  	}
   783  	se = &storagepb.StorageError{}
   784  	e = apiErr.Details().ExtractProtoMessage(se)
   785  	if e != nil {
   786  		t.Errorf("expected storage error, but extraction failed: %v", e)
   787  	}
   788  	wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS
   789  	if se.GetCode() != wantCode {
   790  		t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
   791  	}
   792  	// Finalize the stream.
   793  	if _, err := ms.Finalize(ctx); err != nil {
   794  		t.Errorf("Finalize had error: %v", err)
   795  	}
   796  	// Send another append, which is disallowed for finalized streams.
   797  	result, err = ms.AppendRows(ctx, data)
   798  	if err != nil {
   799  		t.Errorf("failed to send append: %v", err)
   800  	}
   801  	off, err = result.GetResult(ctx)
   802  	if err == nil {
   803  		t.Errorf("expected error, got offset %d", off)
   804  	}
   805  	apiErr, ok = apierror.FromError(err)
   806  	if !ok {
   807  		t.Errorf("expected apierror, got %T: %v", err, err)
   808  	}
   809  	se = &storagepb.StorageError{}
   810  	e = apiErr.Details().ExtractProtoMessage(se)
   811  	if e != nil {
   812  		t.Errorf("expected storage error, but extraction failed: %v", e)
   813  	}
   814  	wantCode = storagepb.StorageError_STREAM_FINALIZED
   815  	if se.GetCode() != wantCode {
   816  		t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
   817  	}
   818  }
   819  
   820  func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   821  	testTable := dataset.Table(tableIDs.New())
   822  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   823  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   824  	}
   825  
   826  	m := &testdata.SimpleMessageProto2{}
   827  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
   828  
   829  	ms, err := mwClient.NewManagedStream(ctx,
   830  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   831  		WithType(PendingStream),
   832  		WithSchemaDescriptor(descriptorProto),
   833  	)
   834  	if err != nil {
   835  		t.Fatalf("NewManagedStream: %v", err)
   836  	}
   837  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   838  		withExactRowCount(0))
   839  
   840  	// Send data.
   841  	var result *AppendResult
   842  	for k, mesg := range testSimpleData {
   843  		b, err := proto.Marshal(mesg)
   844  		if err != nil {
   845  			t.Errorf("failed to marshal message %d: %v", k, err)
   846  		}
   847  		data := [][]byte{b}
   848  		result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
   849  		if err != nil {
   850  			t.Errorf("single-row append %d failed: %v", k, err)
   851  		}
   852  		// Be explicit about waiting/checking each response.
   853  		off, err := result.GetResult(ctx)
   854  		if err != nil {
   855  			t.Errorf("response %d error: %v", k, err)
   856  		}
   857  		if off != int64(k) {
   858  			t.Errorf("offset mismatch, got %d want %d", off, k)
   859  		}
   860  	}
   861  	wantRows := int64(len(testSimpleData))
   862  
   863  	// Mark stream complete.
   864  	trackedOffset, err := ms.Finalize(ctx)
   865  	if err != nil {
   866  		t.Errorf("Finalize: %v", err)
   867  	}
   868  
   869  	if trackedOffset != wantRows {
   870  		t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows)
   871  	}
   872  
   873  	// Commit stream and validate.
   874  	req := &storagepb.BatchCommitWriteStreamsRequest{
   875  		Parent:       TableParentFromStreamName(ms.StreamName()),
   876  		WriteStreams: []string{ms.StreamName()},
   877  	}
   878  
   879  	resp, err := mwClient.BatchCommitWriteStreams(ctx, req)
   880  	if err != nil {
   881  		t.Errorf("client.BatchCommit: %v", err)
   882  	}
   883  	if len(resp.StreamErrors) > 0 {
   884  		t.Errorf("stream errors present: %v", resp.StreamErrors)
   885  	}
   886  	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
   887  		withExactRowCount(int64(len(testSimpleData))))
   888  }
   889  
   890  func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   891  	testTable := dataset.Table(tableIDs.New())
   892  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   893  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   894  	}
   895  
   896  	m := &testdata.SimpleMessageProto2{}
   897  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
   898  
   899  	ms, err := mwClient.NewManagedStream(ctx,
   900  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   901  		WithType(CommittedStream),
   902  		WithSchemaDescriptor(descriptorProto),
   903  	)
   904  	if err != nil {
   905  		t.Fatalf("NewManagedStream: %v", err)
   906  	}
   907  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   908  		withExactRowCount(0))
   909  
   910  	// Construct a Very Large request.
   911  	var data [][]byte
   912  	targetSize := 11 * 1024 * 1024 // 11 MB
   913  	b, err := proto.Marshal(testSimpleData[0])
   914  	if err != nil {
   915  		t.Errorf("failed to marshal message: %v", err)
   916  	}
   917  
   918  	numRows := targetSize / len(b)
   919  	data = make([][]byte, numRows)
   920  
   921  	for i := 0; i < numRows; i++ {
   922  		data[i] = b
   923  	}
   924  
   925  	result, err := ms.AppendRows(ctx, data, WithOffset(0))
   926  	if err != nil {
   927  		t.Errorf("single append failed: %v", err)
   928  	}
   929  	_, err = result.GetResult(ctx)
   930  	if err != nil {
   931  		apiErr, ok := apierror.FromError(err)
   932  		if !ok {
   933  			t.Errorf("GetResult error was not an instance of ApiError")
   934  		}
   935  		status := apiErr.GRPCStatus()
   936  		if status.Code() != codes.InvalidArgument {
   937  			t.Errorf("expected InvalidArgument status, got %v", status)
   938  		}
   939  	}
   940  	// our next append is small and should succeed.
   941  	result, err = ms.AppendRows(ctx, [][]byte{b})
   942  	if err != nil {
   943  		t.Fatalf("second append failed: %v", err)
   944  	}
   945  	_, err = result.GetResult(ctx)
   946  	if err != nil {
   947  		t.Errorf("failure result from second append: %v", err)
   948  	}
   949  
   950  	validateTableConstraints(ctx, t, bqClient, testTable, "final",
   951  		withExactRowCount(1))
   952  }
   953  
   954  func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
   955  	testTable := dataset.Table(tableIDs.New())
   956  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
   957  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
   958  	}
   959  
   960  	m := &testdata.SimpleMessageProto2{}
   961  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
   962  
   963  	ms, err := mwClient.NewManagedStream(ctx,
   964  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
   965  		WithType(CommittedStream),
   966  		WithSchemaDescriptor(descriptorProto),
   967  		EnableWriteRetries(true),
   968  	)
   969  	if err != nil {
   970  		t.Fatalf("NewManagedStream: %v", err)
   971  	}
   972  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
   973  		withExactRowCount(0))
   974  
   975  	// Construct a Very Large request.
   976  	var data [][]byte
   977  	targetSize := 11 * 1024 * 1024 // 11 MB
   978  	b, err := proto.Marshal(testSimpleData[0])
   979  	if err != nil {
   980  		t.Errorf("failed to marshal message: %v", err)
   981  	}
   982  
   983  	numRows := targetSize / len(b)
   984  	data = make([][]byte, numRows)
   985  
   986  	for i := 0; i < numRows; i++ {
   987  		data[i] = b
   988  	}
   989  
   990  	result, err := ms.AppendRows(ctx, data, WithOffset(0))
   991  	if err != nil {
   992  		t.Errorf("single append failed: %v", err)
   993  	}
   994  	_, err = result.GetResult(ctx)
   995  	if err != nil {
   996  		apiErr, ok := apierror.FromError(err)
   997  		if !ok {
   998  			t.Errorf("GetResult error was not an instance of ApiError")
   999  		}
  1000  		status := apiErr.GRPCStatus()
  1001  		if status.Code() != codes.InvalidArgument {
  1002  			t.Errorf("expected InvalidArgument status, got %v", status)
  1003  		}
  1004  	}
  1005  
  1006  	// The second append will succeed.
  1007  	result, err = ms.AppendRows(ctx, [][]byte{b})
  1008  	if err != nil {
  1009  		t.Fatalf("second append expected to succeed, got error: %v", err)
  1010  	}
  1011  	_, err = result.GetResult(ctx)
  1012  	if err != nil {
  1013  		t.Errorf("failure result from second append: %v", err)
  1014  	}
  1015  	if attempts, _ := result.TotalAttempts(ctx); attempts != 1 {
  1016  		t.Errorf("expected 1 attempts, got %d", attempts)
  1017  	}
  1018  }
  1019  
  1020  func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
  1021  	testedViews := []*view.View{
  1022  		AppendRequestsView,
  1023  		AppendResponsesView,
  1024  		AppendClientOpenView,
  1025  	}
  1026  
  1027  	if err := view.Register(testedViews...); err != nil {
  1028  		t.Fatalf("couldn't register views: %v", err)
  1029  	}
  1030  
  1031  	testTable := dataset.Table(tableIDs.New())
  1032  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
  1033  		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
  1034  	}
  1035  
  1036  	m := &testdata.SimpleMessageProto2{}
  1037  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
  1038  
  1039  	// setup a new stream.
  1040  	ms, err := mwClient.NewManagedStream(ctx,
  1041  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
  1042  		WithType(DefaultStream),
  1043  		WithSchemaDescriptor(descriptorProto),
  1044  	)
  1045  	if err != nil {
  1046  		t.Fatalf("NewManagedStream: %v", err)
  1047  	}
  1048  
  1049  	var result *AppendResult
  1050  	for k, mesg := range testSimpleData {
  1051  		b, err := proto.Marshal(mesg)
  1052  		if err != nil {
  1053  			t.Errorf("failed to marshal message %d: %v", k, err)
  1054  		}
  1055  		data := [][]byte{b}
  1056  		result, err = ms.AppendRows(ctx, data)
  1057  		if err != nil {
  1058  			t.Errorf("single-row append %d failed: %v", k, err)
  1059  		}
  1060  	}
  1061  	// Wait for the result to indicate ready.
  1062  	result.Ready()
  1063  	// Ick.  Stats reporting can't force flushing, and there's a race here.  Sleep to give the recv goroutine a chance
  1064  	// to report.
  1065  	time.Sleep(time.Second)
  1066  
  1067  	// metric to key tag names
  1068  	wantTags := map[string][]string{
  1069  		"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count":       {"error"},
  1070  		"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil,
  1071  		"cloud.google.com/go/bigquery/storage/managedwriter/append_requests":         {"streamID"},
  1072  		"cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes":    {"streamID"},
  1073  		"cloud.google.com/go/bigquery/storage/managedwriter/append_request_errors":   {"streamID"},
  1074  		"cloud.google.com/go/bigquery/storage/managedwriter/append_rows":             {"streamID"},
  1075  	}
  1076  
  1077  	for _, tv := range testedViews {
  1078  		// Attempt to further improve race failures by retrying metrics retrieval.
  1079  		metricData, err := func() ([]*view.Row, error) {
  1080  			attempt := 0
  1081  			for {
  1082  				data, err := view.RetrieveData(tv.Name)
  1083  				attempt = attempt + 1
  1084  				if attempt > 5 {
  1085  					return data, err
  1086  				}
  1087  				if err == nil && len(data) == 1 {
  1088  					return data, err
  1089  				}
  1090  				time.Sleep(time.Duration(attempt) * 500 * time.Millisecond)
  1091  			}
  1092  		}()
  1093  		if err != nil {
  1094  			t.Errorf("view %q RetrieveData: %v", tv.Name, err)
  1095  		}
  1096  		if mlen := len(metricData); mlen != 1 {
  1097  			t.Errorf("%q: expected 1 row of metrics, got %d", tv.Name, mlen)
  1098  			continue
  1099  		}
  1100  		if wantKeys, ok := wantTags[tv.Name]; ok {
  1101  			if wantKeys == nil {
  1102  				if n := len(tv.TagKeys); n != 0 {
  1103  					t.Errorf("expected view %q to have no keys, but %d present", tv.Name, n)
  1104  				}
  1105  			} else {
  1106  				for _, wk := range wantKeys {
  1107  					var found bool
  1108  					for _, gk := range tv.TagKeys {
  1109  						if gk.Name() == wk {
  1110  							found = true
  1111  							break
  1112  						}
  1113  					}
  1114  					if !found {
  1115  						t.Errorf("expected view %q to have key %q, but wasn't present", tv.Name, wk)
  1116  					}
  1117  				}
  1118  			}
  1119  		}
  1120  		entry := metricData[0].Data
  1121  		sum, ok := entry.(*view.SumData)
  1122  		if !ok {
  1123  			t.Errorf("unexpected metric type: %T", entry)
  1124  		}
  1125  		got := sum.Value
  1126  		var want int64
  1127  		switch tv {
  1128  		case AppendRequestsView:
  1129  			want = int64(len(testSimpleData))
  1130  		case AppendResponsesView:
  1131  			want = int64(len(testSimpleData))
  1132  		case AppendClientOpenView:
  1133  			want = 1
  1134  		}
  1135  
  1136  		// float comparison; diff more than error bound is error
  1137  		if math.Abs(got-float64(want)) > 0.1 {
  1138  			t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want)
  1139  		}
  1140  	}
  1141  }
  1142  
  1143  func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
  1144  	testTable := dataset.Table(tableIDs.New())
  1145  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
  1146  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
  1147  	}
  1148  
  1149  	m := &testdata.SimpleMessageProto2{}
  1150  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
  1151  
  1152  	// setup a new stream.
  1153  	opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
  1154  	opts = append(opts, WithSchemaDescriptor(descriptorProto))
  1155  	ms, err := mwClient.NewManagedStream(ctx, opts...)
  1156  	if err != nil {
  1157  		t.Fatalf("NewManagedStream: %v", err)
  1158  	}
  1159  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
  1160  		withExactRowCount(0))
  1161  
  1162  	var result *AppendResult
  1163  	var curOffset int64
  1164  	var latestRow []byte
  1165  	for k, mesg := range testSimpleData {
  1166  		b, err := proto.Marshal(mesg)
  1167  		if err != nil {
  1168  			t.Errorf("failed to marshal message %d: %v", k, err)
  1169  		}
  1170  		latestRow = b
  1171  		data := [][]byte{b}
  1172  		result, err = ms.AppendRows(ctx, data)
  1173  		if err != nil {
  1174  			t.Errorf("single-row append %d failed: %v", k, err)
  1175  		}
  1176  		curOffset = curOffset + int64(len(data))
  1177  	}
  1178  	// Wait for the result to indicate ready, then validate.
  1179  	_, err = result.GetResult(ctx)
  1180  	if err != nil {
  1181  		t.Errorf("error on append: %v", err)
  1182  	}
  1183  
  1184  	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
  1185  		withExactRowCount(int64(len(testSimpleData))))
  1186  
  1187  	// Now, evolve the underlying table schema.
  1188  	_, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "")
  1189  	if err != nil {
  1190  		t.Errorf("failed to evolve table schema: %v", err)
  1191  	}
  1192  
  1193  	// Resend latest row until we get a new schema notification.
  1194  	// It _should_ be possible to send duplicates, but this currently will not propagate the schema error.
  1195  	// Internal issue: b/211899346
  1196  	//
  1197  	// The alternative here would be to block on GetWriteStream until we get a different write stream, but
  1198  	// this subjects us to a possible race, as the backend that services GetWriteStream isn't necessarily the
  1199  	// one in charge of the stream, and thus may report ready early.
  1200  	for {
  1201  		resp, err := ms.AppendRows(ctx, [][]byte{latestRow})
  1202  		if err != nil {
  1203  			t.Errorf("got error on dupe append: %v", err)
  1204  			break
  1205  		}
  1206  		curOffset = curOffset + 1
  1207  		s, err := resp.UpdatedSchema(ctx)
  1208  		if err != nil {
  1209  			t.Errorf("getting schema error: %v", err)
  1210  		}
  1211  		if s != nil {
  1212  			break
  1213  		}
  1214  	}
  1215  
  1216  	// ready evolved message and descriptor
  1217  	m2 := &testdata.SimpleMessageEvolvedProto2{
  1218  		Name:  proto.String("evolved"),
  1219  		Value: proto.Int64(180),
  1220  		Other: proto.String("hello evolution"),
  1221  	}
  1222  	descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor())
  1223  	b, err := proto.Marshal(m2)
  1224  	if err != nil {
  1225  		t.Errorf("failed to marshal evolved message: %v", err)
  1226  	}
  1227  	// Send an append with an evolved schema
  1228  	res, err := ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
  1229  	if err != nil {
  1230  		t.Errorf("failed evolved append: %v", err)
  1231  	}
  1232  	_, err = res.GetResult(ctx)
  1233  	if err != nil {
  1234  		t.Errorf("error on evolved append: %v", err)
  1235  	}
  1236  	curOffset = curOffset + 1
  1237  
  1238  	// Try to force connection errors from concurrent appends.
  1239  	// We drop setting of offset to avoid commingling out-of-order append errors.
  1240  	var wg sync.WaitGroup
  1241  	for i := 0; i < 5; i++ {
  1242  		id := i
  1243  		wg.Add(1)
  1244  		go func() {
  1245  			res, err := ms.AppendRows(ctx, [][]byte{b})
  1246  			if err != nil {
  1247  				t.Errorf("failed concurrent append %d: %v", id, err)
  1248  			}
  1249  			_, err = res.GetResult(ctx)
  1250  			if err != nil {
  1251  				t.Errorf("error on concurrent append %d: %v", id, err)
  1252  			}
  1253  			wg.Done()
  1254  		}()
  1255  	}
  1256  	wg.Wait()
  1257  
  1258  	validateTableConstraints(ctx, t, bqClient, testTable, "after evolved records send",
  1259  		withExactRowCount(int64(curOffset+5)),
  1260  		withNullCount("name", 0),
  1261  		withNonNullCount("other", 6),
  1262  	)
  1263  }
  1264  
  1265  func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
  1266  	testTable := dataset.Table(tableIDs.New())
  1267  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil {
  1268  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
  1269  	}
  1270  
  1271  	m := &testdata.DefaultValuesPartialSchema{
  1272  		// We only populate the id, as remaining fields are used to test default values.
  1273  		Id: proto.String("someval"),
  1274  	}
  1275  	var data []byte
  1276  	var err error
  1277  	if data, err = proto.Marshal(m); err != nil {
  1278  		t.Fatalf("failed to marshal test row data")
  1279  	}
  1280  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
  1281  
  1282  	// setup a new stream.
  1283  	opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
  1284  	opts = append(opts, WithSchemaDescriptor(descriptorProto))
  1285  	ms, err := mwClient.NewManagedStream(ctx, opts...)
  1286  	if err != nil {
  1287  		t.Fatalf("NewManagedStream: %v", err)
  1288  	}
  1289  	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
  1290  		withExactRowCount(0))
  1291  
  1292  	var result *AppendResult
  1293  
  1294  	// Send one row, verify default values were set as expected.
  1295  
  1296  	result, err = ms.AppendRows(ctx, [][]byte{data})
  1297  	if err != nil {
  1298  		t.Errorf("append failed: %v", err)
  1299  	}
  1300  	// Wait for the result to indicate ready, then validate.
  1301  	_, err = result.GetResult(ctx)
  1302  	if err != nil {
  1303  		t.Errorf("error on append: %v", err)
  1304  	}
  1305  
  1306  	validateTableConstraints(ctx, t, bqClient, testTable, "after first row",
  1307  		withExactRowCount(1),
  1308  		withNonNullCount("id", 1),
  1309  		withNullCount("strcol_withdef", 1),
  1310  		withNullCount("intcol_withdef", 1),
  1311  		withNullCount("otherstr_withdef", 0)) // not part of partial schema
  1312  
  1313  	// Change default MVI to use nulls.
  1314  	// We expect the fields in the partial schema to leverage nulls rather than default values.
  1315  	// The fields outside the partial schema continue to obey default values.
  1316  	result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE))
  1317  	if err != nil {
  1318  		t.Errorf("append failed: %v", err)
  1319  	}
  1320  	// Wait for the result to indicate ready, then validate.
  1321  	_, err = result.GetResult(ctx)
  1322  	if err != nil {
  1323  		t.Errorf("error on append: %v", err)
  1324  	}
  1325  
  1326  	validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)",
  1327  		withExactRowCount(2),
  1328  		withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value
  1329  		withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value
  1330  
  1331  	// Change per-column MVI to use default value
  1332  	result, err = ms.AppendRows(ctx, [][]byte{data},
  1333  		UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
  1334  			"strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE,
  1335  		}))
  1336  	if err != nil {
  1337  		t.Errorf("append failed: %v", err)
  1338  	}
  1339  	// Wait for the result to indicate ready, then validate.
  1340  	_, err = result.GetResult(ctx)
  1341  	if err != nil {
  1342  		t.Errorf("error on append: %v", err)
  1343  	}
  1344  
  1345  	validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)",
  1346  		withExactRowCount(3),
  1347  		withNullCount("strcol_withdef", 2),      // increments as it's null for this column
  1348  		withNullCount("intcol_withdef", 1),      // doesn't increment, still default value
  1349  		withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value
  1350  		withNullCount("otherstr", 3),            // not part of descriptor, always gets null
  1351  		withNullCount("strcol", 3),              // no default value defined, always gets null
  1352  		withNullCount("intcol", 3),              // no default value defined, always gets null
  1353  	)
  1354  }
  1355  
  1356  func TestIntegration_DetectProjectID(t *testing.T) {
  1357  	ctx := context.Background()
  1358  	testCreds := testutil.Credentials(ctx)
  1359  	if testCreds == nil {
  1360  		t.Skip("test credentials not present, skipping")
  1361  	}
  1362  
  1363  	if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
  1364  		t.Errorf("test NewClient: %v", err)
  1365  	}
  1366  
  1367  	badTS := testutil.ErroringTokenSource{}
  1368  
  1369  	if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
  1370  		t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID)
  1371  	}
  1372  }
  1373  
  1374  func TestIntegration_ProtoNormalization(t *testing.T) {
  1375  	mwClient, bqClient := getTestClients(context.Background(), t)
  1376  	defer mwClient.Close()
  1377  	defer bqClient.Close()
  1378  
  1379  	dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
  1380  	if err != nil {
  1381  		t.Fatalf("failed to init test dataset: %v", err)
  1382  	}
  1383  	defer cleanup()
  1384  
  1385  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1386  	defer cancel()
  1387  
  1388  	t.Run("group", func(t *testing.T) {
  1389  		t.Run("ComplexType", func(t *testing.T) {
  1390  			t.Parallel()
  1391  			schema := testdata.ComplexTypeSchema
  1392  			mesg := &testdata.ComplexType{
  1393  				NestedRepeatedType: []*testdata.NestedType{
  1394  					{
  1395  						InnerType: []*testdata.InnerType{
  1396  							{Value: []string{"a", "b", "c"}},
  1397  							{Value: []string{"x", "y", "z"}},
  1398  						},
  1399  					},
  1400  				},
  1401  				InnerType: &testdata.InnerType{
  1402  					Value: []string{"top"},
  1403  				},
  1404  				RangeType: &testdata.RangeTypeTimestamp{
  1405  					End: proto.Int64(time.Now().UnixNano()),
  1406  				},
  1407  			}
  1408  			b, err := proto.Marshal(mesg)
  1409  			if err != nil {
  1410  				t.Fatalf("proto.Marshal: %v", err)
  1411  			}
  1412  			descriptor := (mesg).ProtoReflect().Descriptor()
  1413  			testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
  1414  		})
  1415  		t.Run("WithWellKnownTypes", func(t *testing.T) {
  1416  			t.Parallel()
  1417  			schema := testdata.WithWellKnownTypesSchema
  1418  			mesg := &testdata.WithWellKnownTypes{
  1419  				Int64Value: proto.Int64(123),
  1420  				WrappedInt64: &wrapperspb.Int64Value{
  1421  					Value: 456,
  1422  				},
  1423  				StringValue: []string{"a", "b"},
  1424  				WrappedString: []*wrapperspb.StringValue{
  1425  					{Value: "foo"},
  1426  					{Value: "bar"},
  1427  				},
  1428  			}
  1429  			b, err := proto.Marshal(mesg)
  1430  			if err != nil {
  1431  				t.Fatalf("proto.Marshal: %v", err)
  1432  			}
  1433  			descriptor := (mesg).ProtoReflect().Descriptor()
  1434  			testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
  1435  		})
  1436  		t.Run("WithExternalEnum", func(t *testing.T) {
  1437  			t.Parallel()
  1438  			schema := testdata.ExternalEnumMessageSchema
  1439  			mesg := &testdata.ExternalEnumMessage{
  1440  				MsgA: &testdata.EnumMsgA{
  1441  					Foo: proto.String("foo"),
  1442  					Bar: testdata.ExtEnum_THING.Enum(),
  1443  				},
  1444  				MsgB: &testdata.EnumMsgB{
  1445  					Baz: testdata.ExtEnum_OTHER_THING.Enum(),
  1446  				},
  1447  			}
  1448  			b, err := proto.Marshal(mesg)
  1449  			if err != nil {
  1450  				t.Fatalf("proto.Marshal: %v", err)
  1451  			}
  1452  			descriptor := (mesg).ProtoReflect().Descriptor()
  1453  			testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
  1454  		})
  1455  	})
  1456  }
  1457  
  1458  func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, schema bigquery.Schema, descriptor protoreflect.MessageDescriptor, sampleRow []byte) {
  1459  	testTable := dataset.Table(tableIDs.New())
  1460  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
  1461  		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
  1462  	}
  1463  
  1464  	dp, err := adapt.NormalizeDescriptor(descriptor)
  1465  	if err != nil {
  1466  		t.Fatalf("NormalizeDescriptor: %v", err)
  1467  	}
  1468  
  1469  	// setup a new stream.
  1470  	ms, err := mwClient.NewManagedStream(ctx,
  1471  		WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
  1472  		WithType(DefaultStream),
  1473  		WithSchemaDescriptor(dp),
  1474  	)
  1475  	if err != nil {
  1476  		t.Fatalf("NewManagedStream: %v", err)
  1477  	}
  1478  	result, err := ms.AppendRows(ctx, [][]byte{sampleRow})
  1479  	if err != nil {
  1480  		t.Errorf("append failed: %v", err)
  1481  	}
  1482  
  1483  	_, err = result.GetResult(ctx)
  1484  	if err != nil {
  1485  		t.Errorf("error in response: %v", err)
  1486  	}
  1487  }
  1488  
  1489  func TestIntegration_MultiplexWrites(t *testing.T) {
  1490  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1491  	defer cancel()
  1492  	mwClient, bqClient := getTestClients(ctx, t,
  1493  		WithMultiplexing(),
  1494  		WithMultiplexPoolLimit(2),
  1495  	)
  1496  	defer mwClient.Close()
  1497  	defer bqClient.Close()
  1498  
  1499  	dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, "us-east1")
  1500  	if err != nil {
  1501  		t.Fatalf("failed to init test dataset: %v", err)
  1502  	}
  1503  	defer cleanup()
  1504  
  1505  	wantWrites := 10
  1506  
  1507  	testTables := []struct {
  1508  		tbl         *bigquery.Table
  1509  		schema      bigquery.Schema
  1510  		dp          *descriptorpb.DescriptorProto
  1511  		sampleRow   []byte
  1512  		constraints []constraintOption
  1513  	}{
  1514  		{
  1515  			tbl:    dataset.Table(tableIDs.New()),
  1516  			schema: testdata.SimpleMessageSchema,
  1517  			dp: func() *descriptorpb.DescriptorProto {
  1518  				m := &testdata.SimpleMessageProto2{}
  1519  				dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
  1520  				return dp
  1521  			}(),
  1522  			sampleRow: func() []byte {
  1523  				msg := &testdata.SimpleMessageProto2{
  1524  					Name:  proto.String("sample_name"),
  1525  					Value: proto.Int64(1001),
  1526  				}
  1527  				b, _ := proto.Marshal(msg)
  1528  				return b
  1529  			}(),
  1530  			constraints: []constraintOption{
  1531  				withExactRowCount(int64(wantWrites)),
  1532  				withStringValueCount("name", "sample_name", int64(wantWrites)),
  1533  			},
  1534  		},
  1535  		{
  1536  			tbl:    dataset.Table(tableIDs.New()),
  1537  			schema: testdata.ValidationBaseSchema,
  1538  			dp: func() *descriptorpb.DescriptorProto {
  1539  				m := &testdata.ValidationP2Optional{}
  1540  				dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
  1541  				return dp
  1542  			}(),
  1543  			sampleRow: func() []byte {
  1544  				msg := &testdata.ValidationP2Optional{
  1545  					Int64Field:  proto.Int64(69),
  1546  					StringField: proto.String("validation_string"),
  1547  				}
  1548  				b, _ := proto.Marshal(msg)
  1549  				return b
  1550  			}(),
  1551  			constraints: []constraintOption{
  1552  				withExactRowCount(int64(wantWrites)),
  1553  				withStringValueCount("string_field", "validation_string", int64(wantWrites)),
  1554  			},
  1555  		},
  1556  		{
  1557  			tbl:    dataset.Table(tableIDs.New()),
  1558  			schema: testdata.GithubArchiveSchema,
  1559  			dp: func() *descriptorpb.DescriptorProto {
  1560  				m := &testdata.GithubArchiveMessageProto2{}
  1561  				dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
  1562  				return dp
  1563  			}(),
  1564  			sampleRow: func() []byte {
  1565  				msg := &testdata.GithubArchiveMessageProto2{
  1566  					Payload: proto.String("payload_string"),
  1567  					Id:      proto.String("some_id"),
  1568  				}
  1569  				b, _ := proto.Marshal(msg)
  1570  				return b
  1571  			}(),
  1572  			constraints: []constraintOption{
  1573  				withExactRowCount(int64(wantWrites)),
  1574  				withStringValueCount("payload", "payload_string", int64(wantWrites)),
  1575  			},
  1576  		},
  1577  	}
  1578  
  1579  	// setup tables
  1580  	for _, testTable := range testTables {
  1581  		if err := testTable.tbl.Create(ctx, &bigquery.TableMetadata{Schema: testTable.schema}); err != nil {
  1582  			t.Fatalf("failed to create test table %q: %v", testTable.tbl.FullyQualifiedName(), err)
  1583  		}
  1584  	}
  1585  
  1586  	var gotFirstPool *connectionPool
  1587  	var results []*AppendResult
  1588  	for i := 0; i < wantWrites; i++ {
  1589  		for k, testTable := range testTables {
  1590  			// create a writer and send a single append
  1591  			ms, err := mwClient.NewManagedStream(ctx,
  1592  				WithDestinationTable(TableParentFromParts(testTable.tbl.ProjectID, testTable.tbl.DatasetID, testTable.tbl.TableID)),
  1593  				WithType(DefaultStream),
  1594  				WithSchemaDescriptor(testTable.dp),
  1595  				EnableWriteRetries(true),
  1596  			)
  1597  			if err != nil {
  1598  				t.Fatalf("NewManagedStream %d: %v", k, err)
  1599  			}
  1600  			if i == 0 && k == 0 {
  1601  				if ms.pool == nil {
  1602  					t.Errorf("expected a non-nil pool reference for first writer")
  1603  				}
  1604  				gotFirstPool = ms.pool
  1605  			} else {
  1606  				if ms.pool != gotFirstPool {
  1607  					t.Errorf("expected same pool reference, got a different pool")
  1608  				}
  1609  			}
  1610  			defer ms.Close() // we won't clean these up until the end of the test, rather than per use.
  1611  			if err != nil {
  1612  				t.Fatalf("failed to create ManagedStream for table %d on iteration %d: %v", k, i, err)
  1613  			}
  1614  			res, err := ms.AppendRows(ctx, [][]byte{testTable.sampleRow})
  1615  			if err != nil {
  1616  				t.Fatalf("failed to append to table %d on iteration %d: %v", k, i, err)
  1617  			}
  1618  			results = append(results, res)
  1619  		}
  1620  	}
  1621  
  1622  	// drain results
  1623  	for k, res := range results {
  1624  		if _, err := res.GetResult(ctx); err != nil {
  1625  			t.Errorf("result %d yielded error: %v", k, err)
  1626  		}
  1627  	}
  1628  
  1629  	// validate the tables
  1630  	for _, testTable := range testTables {
  1631  		validateTableConstraints(ctx, t, bqClient, testTable.tbl, "", testTable.constraints...)
  1632  	}
  1633  
  1634  }
  1635  
  1636  func TestIntegration_MingledContexts(t *testing.T) {
  1637  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1638  	defer cancel()
  1639  	mwClient, bqClient := getTestClients(ctx, t,
  1640  		WithMultiplexing(),
  1641  		WithMultiplexPoolLimit(2),
  1642  	)
  1643  	defer mwClient.Close()
  1644  	defer bqClient.Close()
  1645  
  1646  	wantLocation := "us-east4"
  1647  
  1648  	dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation)
  1649  	if err != nil {
  1650  		t.Fatalf("failed to init test dataset: %v", err)
  1651  	}
  1652  	defer cleanup()
  1653  
  1654  	testTable := dataset.Table(tableIDs.New())
  1655  	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
  1656  		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
  1657  	}
  1658  
  1659  	m := &testdata.SimpleMessageProto2{}
  1660  	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
  1661  
  1662  	numWriters := 4
  1663  	contexts := make([]context.Context, numWriters)
  1664  	cancels := make([]context.CancelFunc, numWriters)
  1665  	writers := make([]*ManagedStream, numWriters)
  1666  	for i := 0; i < numWriters; i++ {
  1667  		contexts[i], cancels[i] = context.WithCancel(ctx)
  1668  		ms, err := mwClient.NewManagedStream(contexts[i],
  1669  			WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
  1670  			WithType(DefaultStream),
  1671  			WithSchemaDescriptor(descriptorProto),
  1672  		)
  1673  		if err != nil {
  1674  			t.Fatalf("instantating writer %d failed: %v", i, err)
  1675  		}
  1676  		writers[i] = ms
  1677  	}
  1678  
  1679  	sampleRow, err := proto.Marshal(&testdata.SimpleMessageProto2{
  1680  		Name:  proto.String("datafield"),
  1681  		Value: proto.Int64(1234),
  1682  	})
  1683  	if err != nil {
  1684  		t.Fatalf("failed to generate sample row")
  1685  	}
  1686  
  1687  	for i := 0; i < numWriters; i++ {
  1688  		res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
  1689  		if err != nil {
  1690  			t.Errorf("initial write on %d failed: %v", i, err)
  1691  		} else {
  1692  			if _, err := res.GetResult(contexts[i]); err != nil {
  1693  				t.Errorf("GetResult initial write %d: %v", i, err)
  1694  			}
  1695  		}
  1696  	}
  1697  
  1698  	// cancel the first context
  1699  	cancels[0]()
  1700  	// repeat writes on all other writers with the second context
  1701  	for i := 1; i < numWriters; i++ {
  1702  		res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
  1703  		if err != nil {
  1704  			t.Errorf("second write on %d failed: %v", i, err)
  1705  		} else {
  1706  			if _, err := res.GetResult(contexts[1]); err != nil {
  1707  				t.Errorf("GetResult err on second write %d: %v", i, err)
  1708  			}
  1709  		}
  1710  	}
  1711  
  1712  	// check that writes to the first writer should fail, even with a valid request context.
  1713  	if _, err := writers[0].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil {
  1714  		t.Errorf("write succeeded on first writer when it should have failed")
  1715  	}
  1716  
  1717  	// cancel the second context as well, ensure writer created with good context and bad request context fails
  1718  	cancels[1]()
  1719  	if _, err := writers[2].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil {
  1720  		t.Errorf("write succeeded on third writer with a bad request context")
  1721  	}
  1722  
  1723  	// repeat writes on remaining good writers/contexts
  1724  	for i := 2; i < numWriters; i++ {
  1725  		res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
  1726  		if err != nil {
  1727  			t.Errorf("second write on %d failed: %v", i, err)
  1728  		} else {
  1729  			if _, err := res.GetResult(contexts[i]); err != nil {
  1730  				t.Errorf("GetResult err on second write %d: %v", i, err)
  1731  			}
  1732  		}
  1733  	}
  1734  }
  1735  

View as plain text