...

Source file src/cloud.google.com/go/bigquery/storage_integration_test.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"testing"
    22  	"time"
    23  
    24  	"cloud.google.com/go/internal/testutil"
    25  	"github.com/apache/arrow/go/v15/arrow"
    26  	"github.com/apache/arrow/go/v15/arrow/array"
    27  	"github.com/apache/arrow/go/v15/arrow/ipc"
    28  	"github.com/apache/arrow/go/v15/arrow/math"
    29  	"github.com/apache/arrow/go/v15/arrow/memory"
    30  	"github.com/google/go-cmp/cmp"
    31  	"google.golang.org/api/iterator"
    32  )
    33  
    34  func TestIntegration_StorageReadBasicTypes(t *testing.T) {
    35  	if client == nil {
    36  		t.Skip("Integration tests skipped")
    37  	}
    38  	ctx := context.Background()
    39  
    40  	initQueryParameterTestCases()
    41  
    42  	for _, c := range queryParameterTestCases {
    43  		t.Run(c.name, func(t *testing.T) {
    44  			q := storageOptimizedClient.Query(c.query)
    45  			q.Parameters = c.parameters
    46  			q.forceStorageAPI = true
    47  			it, err := q.Read(ctx)
    48  			if err != nil {
    49  				t.Fatal(err)
    50  			}
    51  			err = checkIteratorRead(it, c.wantRow)
    52  			if err != nil {
    53  				t.Fatalf("%s: error on query `%s`[%v]: %v", it.SourceJob().ID(), c.query, c.parameters, err)
    54  			}
    55  			if !it.IsAccelerated() {
    56  				t.Fatalf("%s: expected storage api to be used", it.SourceJob().ID())
    57  			}
    58  		})
    59  	}
    60  }
    61  
    62  func TestIntegration_StorageReadEmptyResultSet(t *testing.T) {
    63  	if client == nil {
    64  		t.Skip("Integration tests skipped")
    65  	}
    66  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    67  	defer cancel()
    68  
    69  	table := storageOptimizedClient.Dataset(dataset.DatasetID).Table(tableIDs.New())
    70  	err := table.Create(ctx, &TableMetadata{
    71  		Schema: Schema{
    72  			{Name: "name", Type: StringFieldType, Required: true},
    73  		},
    74  	})
    75  	if err != nil {
    76  		t.Fatal(err)
    77  	}
    78  	defer table.Delete(ctx)
    79  
    80  	it := table.Read(ctx)
    81  	err = checkIteratorRead(it, []Value{})
    82  	if err != nil {
    83  		t.Fatalf("failed to read empty table: %v", err)
    84  	}
    85  	if !it.IsAccelerated() {
    86  		t.Fatal("expected storage api to be used")
    87  	}
    88  }
    89  
    90  func TestIntegration_StorageReadFromSources(t *testing.T) {
    91  	if client == nil {
    92  		t.Skip("Integration tests skipped")
    93  	}
    94  	ctx := context.Background()
    95  
    96  	dstTable := dataset.Table(tableIDs.New())
    97  	dstTable.c = storageOptimizedClient
    98  
    99  	sql := `SELECT 1 as num, 'one' as str 
   100  UNION ALL 
   101  SELECT 2 as num, 'two' as str 
   102  UNION ALL 
   103  SELECT 3 as num, 'three' as str 
   104  ORDER BY num`
   105  	q := storageOptimizedClient.Query(sql)
   106  	q.Dst = dstTable
   107  	job, err := q.Run(ctx)
   108  	if err != nil {
   109  		t.Fatal(err)
   110  	}
   111  	status, err := job.Wait(ctx)
   112  	if err != nil {
   113  		t.Fatal(err)
   114  	}
   115  	if err := status.Err(); err != nil {
   116  		t.Fatal(err)
   117  	}
   118  	expectedRows := [][]Value{
   119  		{int64(1), "one"},
   120  		{int64(2), "two"},
   121  		{int64(3), "three"},
   122  	}
   123  	tableRowIt := dstTable.Read(ctx)
   124  	if err = checkRowsRead(tableRowIt, expectedRows); err != nil {
   125  		t.Fatalf("checkRowsRead(table): %v", err)
   126  	}
   127  	if !tableRowIt.IsAccelerated() {
   128  		t.Fatalf("reading from table should use Storage API")
   129  	}
   130  	jobRowIt, err := job.Read(ctx)
   131  	if err != nil {
   132  		t.Fatalf("ReadJobResults(job): %v", err)
   133  	}
   134  	if err = checkRowsRead(jobRowIt, expectedRows); err != nil {
   135  		t.Fatalf("checkRowsRead(job): %v", err)
   136  	}
   137  	if !jobRowIt.IsAccelerated() {
   138  		t.Fatalf("reading job should use Storage API")
   139  	}
   140  	q.Dst = nil
   141  	q.forceStorageAPI = true
   142  	qRowIt, err := q.Read(ctx)
   143  	if err != nil {
   144  		t.Fatalf("ReadQuery(query): %v", err)
   145  	}
   146  	if !qRowIt.IsAccelerated() {
   147  		t.Fatalf("reading query should use Storage API")
   148  	}
   149  	if err = checkRowsRead(qRowIt, expectedRows); err != nil {
   150  		t.Fatalf("checkRowsRead(query): %v", err)
   151  	}
   152  }
   153  
   154  func TestIntegration_StorageReadScriptJob(t *testing.T) {
   155  	if client == nil {
   156  		t.Skip("Integration tests skipped")
   157  	}
   158  	tableID := tableIDs.New()
   159  	ctx := context.Background()
   160  
   161  	sql := fmt.Sprintf(`
   162  -- Statement 0
   163  DECLARE x INT64;
   164  SET x = 4;
   165  -- Statement 1
   166  SELECT 1 as foo;
   167  -- Statement 2
   168  SELECT 1 as num, 'one' as str 
   169  UNION ALL 
   170  SELECT 2 as num, 'two' as str;
   171  -- Statement 3
   172  SELECT 1 as num, 'one' as str 
   173  UNION ALL 
   174  SELECT 2 as num, 'two' as str 
   175  UNION ALL 
   176  SELECT 3 as num, 'three' as str 
   177  UNION ALL 
   178  SELECT x as num, 'four' as str 
   179  ORDER BY num;
   180  -- Statement 4
   181  CREATE TABLE %s.%s ( num INT64, str STRING );
   182  -- Statement 5
   183  DROP TABLE %s.%s;
   184  `, dataset.DatasetID, tableID, dataset.DatasetID, tableID)
   185  	q := storageOptimizedClient.Query(sql)
   186  	q.forceStorageAPI = true
   187  	it, err := q.Read(ctx)
   188  	if err != nil {
   189  		t.Fatal(err)
   190  	}
   191  	expectedRows := [][]Value{
   192  		{int64(1), "one"},
   193  		{int64(2), "two"},
   194  		{int64(3), "three"},
   195  		{int64(4), "four"},
   196  	}
   197  	if err = checkRowsRead(it, expectedRows); err != nil {
   198  		t.Fatalf("checkRowsRead(it): %v", err)
   199  	}
   200  	if !it.IsAccelerated() {
   201  		t.Fatalf("reading job should use Storage API")
   202  	}
   203  }
   204  
   205  func TestIntegration_StorageReadQueryOrdering(t *testing.T) {
   206  	if client == nil {
   207  		t.Skip("Integration tests skipped")
   208  	}
   209  	ctx := context.Background()
   210  
   211  	table := "`bigquery-public-data.usa_names.usa_1910_current`"
   212  	testCases := []struct {
   213  		name               string
   214  		query              string
   215  		maxExpectedStreams int
   216  	}{
   217  		{
   218  			name:               "Non_Ordered_Query",
   219  			query:              fmt.Sprintf(`SELECT name, number, state FROM %s`, table),
   220  			maxExpectedStreams: -1, // No limit
   221  		},
   222  		{
   223  			name:               "Ordered_Query",
   224  			query:              fmt.Sprintf(`SELECT name, number, state FROM %s order by name`, table),
   225  			maxExpectedStreams: 1,
   226  		},
   227  	}
   228  
   229  	type S struct {
   230  		Name   string
   231  		Number int
   232  		State  string
   233  	}
   234  
   235  	for _, tc := range testCases {
   236  		q := storageOptimizedClient.Query(tc.query)
   237  		q.forceStorageAPI = true
   238  
   239  		it, err := q.Read(ctx)
   240  		if err != nil {
   241  			t.Fatal(err)
   242  		}
   243  
   244  		var firstValue S
   245  		err = it.Next(&firstValue)
   246  		if err != nil {
   247  			t.Fatal(err)
   248  		}
   249  
   250  		if cmp.Equal(firstValue, S{}) {
   251  			t.Fatalf("user defined struct was not filled with data")
   252  		}
   253  
   254  		total, err := countIteratorRows(it)
   255  		if err != nil {
   256  			t.Fatal(err)
   257  		}
   258  		total++ // as we read the first value separately
   259  
   260  		session := it.arrowIterator.(*storageArrowIterator).session
   261  		bqSession := session.bqSession
   262  		if len(bqSession.Streams) == 0 {
   263  			t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams))
   264  		}
   265  		streamSettings := session.settings.maxStreamCount
   266  		if tc.maxExpectedStreams > 0 {
   267  			if streamSettings > tc.maxExpectedStreams {
   268  				t.Fatalf("%s: expected stream settings to be at most %d streams but found %d", tc.name, tc.maxExpectedStreams, streamSettings)
   269  			}
   270  			if len(bqSession.Streams) > tc.maxExpectedStreams {
   271  				t.Fatalf("%s: expected server to set up at most %d streams but found %d", tc.name, tc.maxExpectedStreams, len(bqSession.Streams))
   272  			}
   273  		} else {
   274  			if streamSettings != 0 {
   275  				t.Fatalf("%s: expected stream settings to be 0 (server defines amount of stream) but found %d", tc.name, streamSettings)
   276  			}
   277  		}
   278  		if total != it.TotalRows {
   279  			t.Fatalf("%s: should have read %d rows, but read %d", tc.name, it.TotalRows, total)
   280  		}
   281  		if !it.IsAccelerated() {
   282  			t.Fatalf("%s: expected query to be accelerated by Storage API", tc.name)
   283  		}
   284  	}
   285  }
   286  
   287  func TestIntegration_StorageReadQueryStruct(t *testing.T) {
   288  	if client == nil {
   289  		t.Skip("Integration tests skipped")
   290  	}
   291  	ctx := context.Background()
   292  	table := "`bigquery-public-data.samples.wikipedia`"
   293  	sql := fmt.Sprintf(`SELECT id, title, timestamp, comment FROM %s LIMIT 1000`, table)
   294  	q := storageOptimizedClient.Query(sql)
   295  	q.forceStorageAPI = true
   296  	q.DisableQueryCache = true
   297  	it, err := q.Read(ctx)
   298  	if err != nil {
   299  		t.Fatal(err)
   300  	}
   301  	if !it.IsAccelerated() {
   302  		t.Fatal("expected query to use Storage API")
   303  	}
   304  
   305  	type S struct {
   306  		ID        int64
   307  		Title     string
   308  		Timestamp int64
   309  		Comment   NullString
   310  	}
   311  
   312  	total := uint64(0)
   313  	for {
   314  		var dst S
   315  		err := it.Next(&dst)
   316  		if err == iterator.Done {
   317  			break
   318  		}
   319  		if err != nil {
   320  			t.Fatalf("failed to fetch via storage API: %v", err)
   321  		}
   322  		if cmp.Equal(dst, S{}) {
   323  			t.Fatalf("user defined struct was not filled with data")
   324  		}
   325  		total++
   326  	}
   327  
   328  	bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession
   329  	if len(bqSession.Streams) == 0 {
   330  		t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
   331  	}
   332  	if total != it.TotalRows {
   333  		t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
   334  	}
   335  }
   336  
   337  func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
   338  	if client == nil {
   339  		t.Skip("Integration tests skipped")
   340  	}
   341  	ctx := context.Background()
   342  	table := "`bigquery-public-data.samples.github_timeline`"
   343  	sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
   344  	// Don't forceStorageAPI usage and still see internally Storage API is selected
   345  	q := storageOptimizedClient.Query(sql)
   346  	q.DisableQueryCache = true
   347  	it, err := q.Read(ctx)
   348  	if err != nil {
   349  		t.Fatal(err)
   350  	}
   351  	if !it.IsAccelerated() {
   352  		t.Fatal("expected query to use Storage API")
   353  	}
   354  
   355  	type S struct {
   356  		URL   NullString
   357  		Owner NullString
   358  		Forks NullInt64
   359  	}
   360  
   361  	var firstValue S
   362  	err = it.Next(&firstValue)
   363  	if err != nil {
   364  		t.Fatal(err)
   365  	}
   366  
   367  	if cmp.Equal(firstValue, S{}) {
   368  		t.Fatalf("user defined struct was not filled with data")
   369  	}
   370  
   371  	total, err := countIteratorRows(it)
   372  	if err != nil {
   373  		t.Fatal(err)
   374  	}
   375  	total++ // as we read the first value separately
   376  
   377  	bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession
   378  	if len(bqSession.Streams) == 0 {
   379  		t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
   380  	}
   381  	if total != it.TotalRows {
   382  		t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
   383  	}
   384  }
   385  
   386  func TestIntegration_StorageReadCancel(t *testing.T) {
   387  	if client == nil {
   388  		t.Skip("Integration tests skipped")
   389  	}
   390  	ctx := context.Background()
   391  	ctx, cancel := context.WithCancel(ctx)
   392  	defer cancel()
   393  	table := "`bigquery-public-data.samples.github_timeline`"
   394  	sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
   395  	storageOptimizedClient.rc.settings.maxWorkerCount = 1
   396  	q := storageOptimizedClient.Query(sql)
   397  	q.DisableQueryCache = true
   398  	q.forceStorageAPI = true
   399  	it, err := q.Read(ctx)
   400  	if err != nil {
   401  		t.Fatal(err)
   402  	}
   403  	if !it.IsAccelerated() {
   404  		t.Fatal("expected query to use Storage API")
   405  	}
   406  
   407  	// Cancel read after readings 1000 rows
   408  	rowsRead := 0
   409  	for {
   410  		var dst []Value
   411  		err := it.Next(&dst)
   412  		if err == iterator.Done {
   413  			break
   414  		}
   415  		if err != nil {
   416  			if errors.Is(err, context.DeadlineExceeded) ||
   417  				errors.Is(err, context.Canceled) {
   418  				break
   419  			}
   420  			t.Fatalf("failed to fetch via storage API: %v", err)
   421  		}
   422  		rowsRead++
   423  		if rowsRead > 1000 {
   424  			cancel()
   425  		}
   426  	}
   427  	// resources are cleaned asynchronously
   428  	time.Sleep(time.Second)
   429  	arrowIt := it.arrowIterator.(*storageArrowIterator)
   430  	if !arrowIt.isDone() {
   431  		t.Fatal("expected stream to be done")
   432  	}
   433  }
   434  
   435  func TestIntegration_StorageReadArrow(t *testing.T) {
   436  	if client == nil {
   437  		t.Skip("Integration tests skipped")
   438  	}
   439  	ctx := context.Background()
   440  	table := "`bigquery-public-data.usa_names.usa_1910_current`"
   441  	sql := fmt.Sprintf(`SELECT name, number, state FROM %s where state = "CA"`, table)
   442  
   443  	q := storageOptimizedClient.Query(sql)
   444  	job, err := q.Run(ctx) // force usage of Storage API by skipping fast paths
   445  	if err != nil {
   446  		t.Fatal(err)
   447  	}
   448  	it, err := job.Read(ctx)
   449  	if err != nil {
   450  		t.Fatal(err)
   451  	}
   452  
   453  	checkedAllocator := memory.NewCheckedAllocator(memory.DefaultAllocator)
   454  	it.arrowDecoder.allocator = checkedAllocator
   455  	defer checkedAllocator.AssertSize(t, 0)
   456  
   457  	arrowIt, err := it.ArrowIterator()
   458  	if err != nil {
   459  		t.Fatalf("expected iterator to be accelerated: %v", err)
   460  	}
   461  	arrowItReader := NewArrowIteratorReader(arrowIt)
   462  
   463  	records := []arrow.Record{}
   464  	r, err := ipc.NewReader(arrowItReader, ipc.WithAllocator(checkedAllocator))
   465  	numrec := 0
   466  	for r.Next() {
   467  		rec := r.Record()
   468  		rec.Retain()
   469  		defer rec.Release()
   470  		records = append(records, rec)
   471  		numrec += int(rec.NumRows())
   472  	}
   473  	r.Release()
   474  
   475  	arrowSchema := r.Schema()
   476  	arrowTable := array.NewTableFromRecords(arrowSchema, records)
   477  	defer arrowTable.Release()
   478  	if arrowTable.NumRows() != int64(it.TotalRows) {
   479  		t.Fatalf("should have a table with %d rows, but found %d", it.TotalRows, arrowTable.NumRows())
   480  	}
   481  	if arrowTable.NumCols() != 3 {
   482  		t.Fatalf("should have a table with 3 columns, but found %d", arrowTable.NumCols())
   483  	}
   484  
   485  	sumSQL := fmt.Sprintf(`SELECT sum(number) as total FROM %s where state = "CA"`, table)
   486  	sumQuery := client.Query(sumSQL)
   487  	sumIt, err := sumQuery.Read(ctx)
   488  	if err != nil {
   489  		t.Fatal(err)
   490  	}
   491  	sumValues := []Value{}
   492  	err = sumIt.Next(&sumValues)
   493  	if err != nil {
   494  		t.Fatal(err)
   495  	}
   496  	totalFromSQL := sumValues[0].(int64)
   497  
   498  	tr := array.NewTableReader(arrowTable, arrowTable.NumRows())
   499  	defer tr.Release()
   500  	var totalFromArrow int64
   501  	for tr.Next() {
   502  		rec := tr.Record()
   503  		vec := rec.Column(1).(*array.Int64)
   504  		totalFromArrow += math.Int64.Sum(vec)
   505  	}
   506  	if totalFromArrow != totalFromSQL {
   507  		t.Fatalf("expected total to be %d, but with arrow we got %d", totalFromSQL, totalFromArrow)
   508  	}
   509  }
   510  
   511  func countIteratorRows(it *RowIterator) (total uint64, err error) {
   512  	for {
   513  		var dst []Value
   514  		err := it.Next(&dst)
   515  		if err == iterator.Done {
   516  			break
   517  		}
   518  		if err != nil {
   519  			return total, fmt.Errorf("failed to fetch via storage API: %w", err)
   520  		}
   521  		total++
   522  	}
   523  	return total, err
   524  }
   525  
   526  func checkRowsRead(it *RowIterator, expectedRows [][]Value) error {
   527  	if int(it.TotalRows) != len(expectedRows) {
   528  		return fmt.Errorf("expected %d rows, found %d", len(expectedRows), it.TotalRows)
   529  	}
   530  	for _, row := range expectedRows {
   531  		err := checkIteratorRead(it, row)
   532  		if err != nil {
   533  			return err
   534  		}
   535  	}
   536  	return nil
   537  }
   538  
   539  func checkIteratorRead(it *RowIterator, expectedRow []Value) error {
   540  	var outRow []Value
   541  	err := it.Next(&outRow)
   542  	if err == iterator.Done {
   543  		return nil
   544  	}
   545  	if err != nil {
   546  		return fmt.Errorf("failed to fetch via storage API: %v", err)
   547  	}
   548  	if len(outRow) != len(expectedRow) {
   549  		return fmt.Errorf("expected %d columns, but got %d", len(expectedRow), len(outRow))
   550  	}
   551  	if !testutil.Equal(outRow, expectedRow) {
   552  		return fmt.Errorf("got %v, want %v", outRow, expectedRow)
   553  	}
   554  	return nil
   555  }
   556  

View as plain text