1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "testing"
22 "time"
23
24 "cloud.google.com/go/internal/testutil"
25 "github.com/apache/arrow/go/v15/arrow"
26 "github.com/apache/arrow/go/v15/arrow/array"
27 "github.com/apache/arrow/go/v15/arrow/ipc"
28 "github.com/apache/arrow/go/v15/arrow/math"
29 "github.com/apache/arrow/go/v15/arrow/memory"
30 "github.com/google/go-cmp/cmp"
31 "google.golang.org/api/iterator"
32 )
33
34 func TestIntegration_StorageReadBasicTypes(t *testing.T) {
35 if client == nil {
36 t.Skip("Integration tests skipped")
37 }
38 ctx := context.Background()
39
40 initQueryParameterTestCases()
41
42 for _, c := range queryParameterTestCases {
43 t.Run(c.name, func(t *testing.T) {
44 q := storageOptimizedClient.Query(c.query)
45 q.Parameters = c.parameters
46 q.forceStorageAPI = true
47 it, err := q.Read(ctx)
48 if err != nil {
49 t.Fatal(err)
50 }
51 err = checkIteratorRead(it, c.wantRow)
52 if err != nil {
53 t.Fatalf("%s: error on query `%s`[%v]: %v", it.SourceJob().ID(), c.query, c.parameters, err)
54 }
55 if !it.IsAccelerated() {
56 t.Fatalf("%s: expected storage api to be used", it.SourceJob().ID())
57 }
58 })
59 }
60 }
61
62 func TestIntegration_StorageReadEmptyResultSet(t *testing.T) {
63 if client == nil {
64 t.Skip("Integration tests skipped")
65 }
66 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
67 defer cancel()
68
69 table := storageOptimizedClient.Dataset(dataset.DatasetID).Table(tableIDs.New())
70 err := table.Create(ctx, &TableMetadata{
71 Schema: Schema{
72 {Name: "name", Type: StringFieldType, Required: true},
73 },
74 })
75 if err != nil {
76 t.Fatal(err)
77 }
78 defer table.Delete(ctx)
79
80 it := table.Read(ctx)
81 err = checkIteratorRead(it, []Value{})
82 if err != nil {
83 t.Fatalf("failed to read empty table: %v", err)
84 }
85 if !it.IsAccelerated() {
86 t.Fatal("expected storage api to be used")
87 }
88 }
89
90 func TestIntegration_StorageReadFromSources(t *testing.T) {
91 if client == nil {
92 t.Skip("Integration tests skipped")
93 }
94 ctx := context.Background()
95
96 dstTable := dataset.Table(tableIDs.New())
97 dstTable.c = storageOptimizedClient
98
99 sql := `SELECT 1 as num, 'one' as str
100 UNION ALL
101 SELECT 2 as num, 'two' as str
102 UNION ALL
103 SELECT 3 as num, 'three' as str
104 ORDER BY num`
105 q := storageOptimizedClient.Query(sql)
106 q.Dst = dstTable
107 job, err := q.Run(ctx)
108 if err != nil {
109 t.Fatal(err)
110 }
111 status, err := job.Wait(ctx)
112 if err != nil {
113 t.Fatal(err)
114 }
115 if err := status.Err(); err != nil {
116 t.Fatal(err)
117 }
118 expectedRows := [][]Value{
119 {int64(1), "one"},
120 {int64(2), "two"},
121 {int64(3), "three"},
122 }
123 tableRowIt := dstTable.Read(ctx)
124 if err = checkRowsRead(tableRowIt, expectedRows); err != nil {
125 t.Fatalf("checkRowsRead(table): %v", err)
126 }
127 if !tableRowIt.IsAccelerated() {
128 t.Fatalf("reading from table should use Storage API")
129 }
130 jobRowIt, err := job.Read(ctx)
131 if err != nil {
132 t.Fatalf("ReadJobResults(job): %v", err)
133 }
134 if err = checkRowsRead(jobRowIt, expectedRows); err != nil {
135 t.Fatalf("checkRowsRead(job): %v", err)
136 }
137 if !jobRowIt.IsAccelerated() {
138 t.Fatalf("reading job should use Storage API")
139 }
140 q.Dst = nil
141 q.forceStorageAPI = true
142 qRowIt, err := q.Read(ctx)
143 if err != nil {
144 t.Fatalf("ReadQuery(query): %v", err)
145 }
146 if !qRowIt.IsAccelerated() {
147 t.Fatalf("reading query should use Storage API")
148 }
149 if err = checkRowsRead(qRowIt, expectedRows); err != nil {
150 t.Fatalf("checkRowsRead(query): %v", err)
151 }
152 }
153
154 func TestIntegration_StorageReadScriptJob(t *testing.T) {
155 if client == nil {
156 t.Skip("Integration tests skipped")
157 }
158 tableID := tableIDs.New()
159 ctx := context.Background()
160
161 sql := fmt.Sprintf(`
162 -- Statement 0
163 DECLARE x INT64;
164 SET x = 4;
165 -- Statement 1
166 SELECT 1 as foo;
167 -- Statement 2
168 SELECT 1 as num, 'one' as str
169 UNION ALL
170 SELECT 2 as num, 'two' as str;
171 -- Statement 3
172 SELECT 1 as num, 'one' as str
173 UNION ALL
174 SELECT 2 as num, 'two' as str
175 UNION ALL
176 SELECT 3 as num, 'three' as str
177 UNION ALL
178 SELECT x as num, 'four' as str
179 ORDER BY num;
180 -- Statement 4
181 CREATE TABLE %s.%s ( num INT64, str STRING );
182 -- Statement 5
183 DROP TABLE %s.%s;
184 `, dataset.DatasetID, tableID, dataset.DatasetID, tableID)
185 q := storageOptimizedClient.Query(sql)
186 q.forceStorageAPI = true
187 it, err := q.Read(ctx)
188 if err != nil {
189 t.Fatal(err)
190 }
191 expectedRows := [][]Value{
192 {int64(1), "one"},
193 {int64(2), "two"},
194 {int64(3), "three"},
195 {int64(4), "four"},
196 }
197 if err = checkRowsRead(it, expectedRows); err != nil {
198 t.Fatalf("checkRowsRead(it): %v", err)
199 }
200 if !it.IsAccelerated() {
201 t.Fatalf("reading job should use Storage API")
202 }
203 }
204
205 func TestIntegration_StorageReadQueryOrdering(t *testing.T) {
206 if client == nil {
207 t.Skip("Integration tests skipped")
208 }
209 ctx := context.Background()
210
211 table := "`bigquery-public-data.usa_names.usa_1910_current`"
212 testCases := []struct {
213 name string
214 query string
215 maxExpectedStreams int
216 }{
217 {
218 name: "Non_Ordered_Query",
219 query: fmt.Sprintf(`SELECT name, number, state FROM %s`, table),
220 maxExpectedStreams: -1,
221 },
222 {
223 name: "Ordered_Query",
224 query: fmt.Sprintf(`SELECT name, number, state FROM %s order by name`, table),
225 maxExpectedStreams: 1,
226 },
227 }
228
229 type S struct {
230 Name string
231 Number int
232 State string
233 }
234
235 for _, tc := range testCases {
236 q := storageOptimizedClient.Query(tc.query)
237 q.forceStorageAPI = true
238
239 it, err := q.Read(ctx)
240 if err != nil {
241 t.Fatal(err)
242 }
243
244 var firstValue S
245 err = it.Next(&firstValue)
246 if err != nil {
247 t.Fatal(err)
248 }
249
250 if cmp.Equal(firstValue, S{}) {
251 t.Fatalf("user defined struct was not filled with data")
252 }
253
254 total, err := countIteratorRows(it)
255 if err != nil {
256 t.Fatal(err)
257 }
258 total++
259
260 session := it.arrowIterator.(*storageArrowIterator).session
261 bqSession := session.bqSession
262 if len(bqSession.Streams) == 0 {
263 t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams))
264 }
265 streamSettings := session.settings.maxStreamCount
266 if tc.maxExpectedStreams > 0 {
267 if streamSettings > tc.maxExpectedStreams {
268 t.Fatalf("%s: expected stream settings to be at most %d streams but found %d", tc.name, tc.maxExpectedStreams, streamSettings)
269 }
270 if len(bqSession.Streams) > tc.maxExpectedStreams {
271 t.Fatalf("%s: expected server to set up at most %d streams but found %d", tc.name, tc.maxExpectedStreams, len(bqSession.Streams))
272 }
273 } else {
274 if streamSettings != 0 {
275 t.Fatalf("%s: expected stream settings to be 0 (server defines amount of stream) but found %d", tc.name, streamSettings)
276 }
277 }
278 if total != it.TotalRows {
279 t.Fatalf("%s: should have read %d rows, but read %d", tc.name, it.TotalRows, total)
280 }
281 if !it.IsAccelerated() {
282 t.Fatalf("%s: expected query to be accelerated by Storage API", tc.name)
283 }
284 }
285 }
286
287 func TestIntegration_StorageReadQueryStruct(t *testing.T) {
288 if client == nil {
289 t.Skip("Integration tests skipped")
290 }
291 ctx := context.Background()
292 table := "`bigquery-public-data.samples.wikipedia`"
293 sql := fmt.Sprintf(`SELECT id, title, timestamp, comment FROM %s LIMIT 1000`, table)
294 q := storageOptimizedClient.Query(sql)
295 q.forceStorageAPI = true
296 q.DisableQueryCache = true
297 it, err := q.Read(ctx)
298 if err != nil {
299 t.Fatal(err)
300 }
301 if !it.IsAccelerated() {
302 t.Fatal("expected query to use Storage API")
303 }
304
305 type S struct {
306 ID int64
307 Title string
308 Timestamp int64
309 Comment NullString
310 }
311
312 total := uint64(0)
313 for {
314 var dst S
315 err := it.Next(&dst)
316 if err == iterator.Done {
317 break
318 }
319 if err != nil {
320 t.Fatalf("failed to fetch via storage API: %v", err)
321 }
322 if cmp.Equal(dst, S{}) {
323 t.Fatalf("user defined struct was not filled with data")
324 }
325 total++
326 }
327
328 bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession
329 if len(bqSession.Streams) == 0 {
330 t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
331 }
332 if total != it.TotalRows {
333 t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
334 }
335 }
336
337 func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
338 if client == nil {
339 t.Skip("Integration tests skipped")
340 }
341 ctx := context.Background()
342 table := "`bigquery-public-data.samples.github_timeline`"
343 sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
344
345 q := storageOptimizedClient.Query(sql)
346 q.DisableQueryCache = true
347 it, err := q.Read(ctx)
348 if err != nil {
349 t.Fatal(err)
350 }
351 if !it.IsAccelerated() {
352 t.Fatal("expected query to use Storage API")
353 }
354
355 type S struct {
356 URL NullString
357 Owner NullString
358 Forks NullInt64
359 }
360
361 var firstValue S
362 err = it.Next(&firstValue)
363 if err != nil {
364 t.Fatal(err)
365 }
366
367 if cmp.Equal(firstValue, S{}) {
368 t.Fatalf("user defined struct was not filled with data")
369 }
370
371 total, err := countIteratorRows(it)
372 if err != nil {
373 t.Fatal(err)
374 }
375 total++
376
377 bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession
378 if len(bqSession.Streams) == 0 {
379 t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
380 }
381 if total != it.TotalRows {
382 t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
383 }
384 }
385
386 func TestIntegration_StorageReadCancel(t *testing.T) {
387 if client == nil {
388 t.Skip("Integration tests skipped")
389 }
390 ctx := context.Background()
391 ctx, cancel := context.WithCancel(ctx)
392 defer cancel()
393 table := "`bigquery-public-data.samples.github_timeline`"
394 sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
395 storageOptimizedClient.rc.settings.maxWorkerCount = 1
396 q := storageOptimizedClient.Query(sql)
397 q.DisableQueryCache = true
398 q.forceStorageAPI = true
399 it, err := q.Read(ctx)
400 if err != nil {
401 t.Fatal(err)
402 }
403 if !it.IsAccelerated() {
404 t.Fatal("expected query to use Storage API")
405 }
406
407
408 rowsRead := 0
409 for {
410 var dst []Value
411 err := it.Next(&dst)
412 if err == iterator.Done {
413 break
414 }
415 if err != nil {
416 if errors.Is(err, context.DeadlineExceeded) ||
417 errors.Is(err, context.Canceled) {
418 break
419 }
420 t.Fatalf("failed to fetch via storage API: %v", err)
421 }
422 rowsRead++
423 if rowsRead > 1000 {
424 cancel()
425 }
426 }
427
428 time.Sleep(time.Second)
429 arrowIt := it.arrowIterator.(*storageArrowIterator)
430 if !arrowIt.isDone() {
431 t.Fatal("expected stream to be done")
432 }
433 }
434
435 func TestIntegration_StorageReadArrow(t *testing.T) {
436 if client == nil {
437 t.Skip("Integration tests skipped")
438 }
439 ctx := context.Background()
440 table := "`bigquery-public-data.usa_names.usa_1910_current`"
441 sql := fmt.Sprintf(`SELECT name, number, state FROM %s where state = "CA"`, table)
442
443 q := storageOptimizedClient.Query(sql)
444 job, err := q.Run(ctx)
445 if err != nil {
446 t.Fatal(err)
447 }
448 it, err := job.Read(ctx)
449 if err != nil {
450 t.Fatal(err)
451 }
452
453 checkedAllocator := memory.NewCheckedAllocator(memory.DefaultAllocator)
454 it.arrowDecoder.allocator = checkedAllocator
455 defer checkedAllocator.AssertSize(t, 0)
456
457 arrowIt, err := it.ArrowIterator()
458 if err != nil {
459 t.Fatalf("expected iterator to be accelerated: %v", err)
460 }
461 arrowItReader := NewArrowIteratorReader(arrowIt)
462
463 records := []arrow.Record{}
464 r, err := ipc.NewReader(arrowItReader, ipc.WithAllocator(checkedAllocator))
465 numrec := 0
466 for r.Next() {
467 rec := r.Record()
468 rec.Retain()
469 defer rec.Release()
470 records = append(records, rec)
471 numrec += int(rec.NumRows())
472 }
473 r.Release()
474
475 arrowSchema := r.Schema()
476 arrowTable := array.NewTableFromRecords(arrowSchema, records)
477 defer arrowTable.Release()
478 if arrowTable.NumRows() != int64(it.TotalRows) {
479 t.Fatalf("should have a table with %d rows, but found %d", it.TotalRows, arrowTable.NumRows())
480 }
481 if arrowTable.NumCols() != 3 {
482 t.Fatalf("should have a table with 3 columns, but found %d", arrowTable.NumCols())
483 }
484
485 sumSQL := fmt.Sprintf(`SELECT sum(number) as total FROM %s where state = "CA"`, table)
486 sumQuery := client.Query(sumSQL)
487 sumIt, err := sumQuery.Read(ctx)
488 if err != nil {
489 t.Fatal(err)
490 }
491 sumValues := []Value{}
492 err = sumIt.Next(&sumValues)
493 if err != nil {
494 t.Fatal(err)
495 }
496 totalFromSQL := sumValues[0].(int64)
497
498 tr := array.NewTableReader(arrowTable, arrowTable.NumRows())
499 defer tr.Release()
500 var totalFromArrow int64
501 for tr.Next() {
502 rec := tr.Record()
503 vec := rec.Column(1).(*array.Int64)
504 totalFromArrow += math.Int64.Sum(vec)
505 }
506 if totalFromArrow != totalFromSQL {
507 t.Fatalf("expected total to be %d, but with arrow we got %d", totalFromSQL, totalFromArrow)
508 }
509 }
510
511 func countIteratorRows(it *RowIterator) (total uint64, err error) {
512 for {
513 var dst []Value
514 err := it.Next(&dst)
515 if err == iterator.Done {
516 break
517 }
518 if err != nil {
519 return total, fmt.Errorf("failed to fetch via storage API: %w", err)
520 }
521 total++
522 }
523 return total, err
524 }
525
526 func checkRowsRead(it *RowIterator, expectedRows [][]Value) error {
527 if int(it.TotalRows) != len(expectedRows) {
528 return fmt.Errorf("expected %d rows, found %d", len(expectedRows), it.TotalRows)
529 }
530 for _, row := range expectedRows {
531 err := checkIteratorRead(it, row)
532 if err != nil {
533 return err
534 }
535 }
536 return nil
537 }
538
539 func checkIteratorRead(it *RowIterator, expectedRow []Value) error {
540 var outRow []Value
541 err := it.Next(&outRow)
542 if err == iterator.Done {
543 return nil
544 }
545 if err != nil {
546 return fmt.Errorf("failed to fetch via storage API: %v", err)
547 }
548 if len(outRow) != len(expectedRow) {
549 return fmt.Errorf("expected %d columns, but got %d", len(expectedRow), len(outRow))
550 }
551 if !testutil.Equal(outRow, expectedRow) {
552 return fmt.Errorf("got %v, want %v", outRow, expectedRow)
553 }
554 return nil
555 }
556
View as plain text