...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "fmt"
20 "testing"
21
22 "google.golang.org/api/iterator"
23 )
24
25 func BenchmarkIntegration_StorageReadQuery(b *testing.B) {
26 if storageOptimizedClient == nil {
27 b.Skip("Integration tests skipped")
28 }
29 ctx := context.Background()
30 table := "`bigquery-public-data.usa_names.usa_1910_current`"
31 benchCases := []struct {
32 name string
33 filter string
34 }{
35 {name: "usa_1910_current_full", filter: ""},
36 {name: "usa_1910_current_state_eq_fl", filter: "where state = \"FL\""},
37 {name: "usa_1910_current_state_eq_ca", filter: "where state = \"CA\""},
38 {name: "usa_1910_current_full_ordered", filter: "order by name"},
39 }
40
41 type S struct {
42 Name string
43 Number int
44 State string
45 Nested struct {
46 Name string
47 N int
48 }
49 }
50
51 for _, bc := range benchCases {
52 sql := fmt.Sprintf(`SELECT name, number, state, STRUCT(name as name, number as n) as nested FROM %s %s`, table, bc.filter)
53 for _, maxStreamCount := range []int{0, 1} {
54 storageOptimizedClient.rc.settings.maxStreamCount = maxStreamCount
55 b.Run(fmt.Sprintf("storage_api_%d_max_streams_%s", maxStreamCount, bc.name), func(b *testing.B) {
56 for i := 0; i < b.N; i++ {
57 q := storageOptimizedClient.Query(sql)
58 q.forceStorageAPI = true
59 it, err := q.Read(ctx)
60 if err != nil {
61 b.Fatal(err)
62 }
63 if !it.IsAccelerated() {
64 b.Fatal("expected query execution to be accelerated")
65 }
66 for {
67 var s S
68 err := it.Next(&s)
69 if err == iterator.Done {
70 break
71 }
72 if err != nil {
73 b.Fatalf("failed to fetch via storage API: %v", err)
74 }
75 }
76 b.ReportMetric(float64(it.TotalRows), "rows")
77 bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession
78 b.ReportMetric(float64(len(bqSession.Streams)), "parallel_streams")
79 b.ReportMetric(float64(maxStreamCount), "max_streams")
80 }
81 })
82 }
83 b.Run(fmt.Sprintf("rest_api_%s", bc.name), func(b *testing.B) {
84 for i := 0; i < b.N; i++ {
85 q := client.Query(sql)
86 it, err := q.Read(ctx)
87 if err != nil {
88 b.Fatal(err)
89 }
90 for {
91 var s S
92 err := it.Next(&s)
93 if err == iterator.Done {
94 break
95 }
96 if err != nil {
97 b.Fatalf("failed to fetch via query API: %v", err)
98 }
99 }
100 b.ReportMetric(float64(it.TotalRows), "rows")
101 b.ReportMetric(1, "parallel_streams")
102 }
103 })
104 }
105 }
106
View as plain text