1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package main
19
20 import (
21 "bytes"
22 "context"
23 "encoding/json"
24 "flag"
25 "fmt"
26 "io/ioutil"
27 "log"
28 "os"
29 "strings"
30 "time"
31
32 "cloud.google.com/go/bigquery"
33 "google.golang.org/api/iterator"
34 )
35
36
37 type profileTag struct {
38 Key string `json:"key,omitempty" bigquery:"key"`
39 Value string `json:"value,omitempty" bigquery:"value"`
40 }
41
42 type tags []*profileTag
43
44 func (ts *tags) String() string {
45 var s strings.Builder
46 fp := len(*ts)
47 for i, t := range *ts {
48 s.WriteString(fmt.Sprintf("%s:%s", t.Key, t.Value))
49 if i < fp-1 {
50 s.WriteString(",")
51 }
52 }
53 return s.String()
54 }
55
56 func (ts *tags) Set(value string) error {
57 if value == "" {
58 return nil
59 }
60 parts := strings.SplitN(value, ":", 2)
61 if len(parts) == 2 {
62
63 *ts = append(*ts, &profileTag{Key: parts[0], Value: parts[1]})
64 } else {
65 *ts = append(*ts, &profileTag{Key: value})
66 }
67 return nil
68 }
69
70
71 func (ts *tags) AsSlice() []*profileTag {
72 var out []*profileTag
73 for _, v := range *ts {
74 out = append(out, v)
75 }
76 return out
77 }
78
79
80 type profiledQuery struct {
81
82 GroupName string `json:"groupname" bigquery:"groupname"`
83
84 Name string `json:"name" bigquery:"name"`
85
86 Tags []*profileTag `json:"tags" bigquery:"tags"`
87
88 Query *bigquery.Query `json:"-" bigquery:"-"`
89
90 SQL string
91
92 Runs []*timingInfo `json:"runs" bigquery:"runs"`
93
94 EventTime time.Time `json:"event_time" bigquery:"event_time"`
95 }
96
97
98 type timingInfo struct {
99
100 ErrorString string `json:"errorstring,omitempty" bigquery:"errorstring"`
101
102 StartTime time.Time `json:"start_time,omitempty" bigquery:"start_time"`
103
104 QueryEndTime time.Time `json:"query_end_time,omitempty" bigquery:"query_end_time"`
105
106 FirstRowReturnedTime time.Time `json:"first_row_returned_time,omitempty" bigquery:"first_row_returned_time"`
107
108 AllRowsReturnedTime time.Time `json:"all_rows_returned_time,omitempty" bigquery:"all_rows_returned_time"`
109
110 TotalRows int64 `json:"total_rows,omitempty" bigquery:"total_rows"`
111 }
112
113
114 func (t *timingInfo) Summary() string {
115 noVal := "NODATA"
116 var buf bytes.Buffer
117 fmt.Fprintf(&buf, "QUERYTIME ")
118 if !t.QueryEndTime.IsZero() {
119 fmt.Fprintf(&buf, "%v", t.QueryEndTime.Sub(t.StartTime))
120 } else {
121 fmt.Fprintf(&buf, noVal)
122 }
123
124 fmt.Fprintf(&buf, " FIRSTROW ")
125 if !t.FirstRowReturnedTime.IsZero() {
126 fmt.Fprintf(&buf, "%v (+%v)", t.FirstRowReturnedTime.Sub(t.StartTime), t.FirstRowReturnedTime.Sub(t.QueryEndTime))
127 } else {
128 fmt.Fprintf(&buf, noVal)
129 }
130
131 fmt.Fprintf(&buf, " ALLROWS ")
132 if !t.AllRowsReturnedTime.IsZero() {
133 fmt.Fprintf(&buf, "%v (+%v)", t.AllRowsReturnedTime.Sub(t.StartTime), t.AllRowsReturnedTime.Sub(t.FirstRowReturnedTime))
134 } else {
135 fmt.Fprintf(&buf, noVal)
136 }
137 if t.TotalRows > 0 {
138 fmt.Fprintf(&buf, " ROWS %d", t.TotalRows)
139 }
140 if t.ErrorString != "" {
141 fmt.Fprintf(&buf, " ERRORED %s ", t.ErrorString)
142 }
143 return buf.String()
144 }
145
146
147
148
149 func measureSelectQuery(ctx context.Context, q *bigquery.Query) *timingInfo {
150 timing := &timingInfo{
151 StartTime: time.Now(),
152 }
153 it, err := q.Read(ctx)
154 timing.QueryEndTime = time.Now()
155 if err != nil {
156 timing.ErrorString = err.Error()
157 return timing
158 }
159 var row []bigquery.Value
160 var rowCount int64
161 for {
162 err := it.Next(&row)
163 if rowCount == 0 {
164 timing.FirstRowReturnedTime = time.Now()
165 }
166 if err == iterator.Done {
167 timing.AllRowsReturnedTime = time.Now()
168 timing.TotalRows = rowCount
169 break
170 }
171 if err != nil {
172 timing.ErrorString = err.Error()
173 return timing
174 }
175 rowCount++
176 }
177 return timing
178 }
179
180
181
182 func runBenchmarks(ctx context.Context, client *bigquery.Client, filename string, tags *tags, reruns int) (profiles []*profiledQuery, err error) {
183
184 queriesJSON, err := ioutil.ReadFile(filename)
185 if err != nil {
186 return nil, fmt.Errorf("failed to read queries files: %v", err)
187 }
188
189 var benchmarkInput map[string]map[string]string
190 if err := json.Unmarshal(queriesJSON, &benchmarkInput); err != nil {
191 return nil, fmt.Errorf("failed to unmarshall queries data: %v", err)
192 }
193
194 convertedTags := tags.AsSlice()
195
196 for groupName, m := range benchmarkInput {
197 for id, sql := range m {
198 prof := &profiledQuery{
199 GroupName: groupName,
200 Name: id,
201 SQL: sql,
202 Tags: convertedTags,
203 EventTime: time.Now(),
204 }
205 fmt.Printf("Measuring %s : %s", groupName, id)
206 query := client.Query(sql)
207 prof.Query = query
208
209 for i := 0; i < reruns; i++ {
210 fmt.Printf(".")
211 prof.Runs = append(prof.Runs, measureSelectQuery(ctx, query))
212 }
213 fmt.Println()
214 profiles = append(profiles, prof)
215 }
216 }
217 fmt.Println()
218 return profiles, nil
219 }
220
221
222 func printResults(queries []*profiledQuery) {
223 for i, prof := range queries {
224 fmt.Printf("%d: (%s:%s)\n", i, prof.GroupName, prof.Name)
225 fmt.Printf("SQL: %s\n", prof.Query.Q)
226 fmt.Printf("MEASUREMENTS\n")
227 for j, timing := range prof.Runs {
228 fmt.Printf("\t\t(%d) %s\n", j, timing.Summary())
229 }
230 fmt.Println()
231 }
232 }
233
234
235 func prepareTable(ctx context.Context, client *bigquery.Client, table string, create bool) (*bigquery.Table, error) {
236
237 parts := strings.Split(table, ".")
238 if len(parts) != 3 {
239 return nil, fmt.Errorf("Expected table in p.d.t format, got: %s", table)
240 }
241 tRef := client.DatasetInProject(parts[0], parts[1]).Table(parts[2])
242
243 _, err := tRef.Metadata(ctx)
244 if err != nil {
245 if create {
246 schema, err := bigquery.InferSchema(profiledQuery{})
247 if err != nil {
248 return nil, fmt.Errorf("could not infer schema while creating table: %v", err)
249 }
250 createMeta := &bigquery.TableMetadata{
251 Schema: schema.Relax(),
252 TimePartitioning: &bigquery.TimePartitioning{
253 Type: bigquery.DayPartitioningType,
254 Field: "event_time",
255 },
256 Clustering: &bigquery.Clustering{
257 Fields: []string{"groupname", "name"},
258 },
259 }
260 if err2 := tRef.Create(ctx, createMeta); err2 != nil {
261 return nil, fmt.Errorf("could not create table: %v", err2)
262 }
263 return tRef, nil
264 }
265 return nil, fmt.Errorf("error while validating table existence: %v", err)
266 }
267 return tRef, nil
268 }
269
270
271 func reportResults(ctx context.Context, client *bigquery.Client, table *bigquery.Table, results []*profiledQuery) error {
272 inserter := table.Inserter()
273
274
275 ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
276 defer cancel()
277 if err := inserter.Put(ctx, results); err != nil {
278 return fmt.Errorf("reportResults: %v", err)
279 }
280 return nil
281 }
282
283 func main() {
284 var reruns = flag.Int("reruns", 3, "number of reruns to issue for each query")
285 var queryfile = flag.String("queryfile", "benchmarked-queries.json", "path to file contain queries to be benchmarked.")
286 var projectID = flag.String("projectid", "", "project ID to use for running benchmarks. Uses GOOGLE_CLOUD_PROJECT env if not set.")
287 var reportTable = flag.String("table", "", "table to stream results into, specified in project.dataset.table format")
288 var createTable = flag.Bool("create_table", false, "create result table if it does not exist")
289
290 var tags tags
291 flag.Var(&tags, "tag", "an optional key and value seperated by colon (:) character")
292 flag.Parse()
293
294
295 if *reruns <= 0 {
296 log.Fatalf("--reruns should be a positive value")
297 }
298 projID := os.Getenv("GOOGLE_CLOUD_PROJECT")
299 if *projectID != "" {
300 projID = *projectID
301 }
302 if projID == "" {
303 log.Fatalf("must provide --projectid or set GOOGLE_CLOUD_PROJECT environment variable")
304 }
305
306
307 ctx := context.Background()
308 client, err := bigquery.NewClient(ctx, projID)
309 if err != nil {
310 log.Fatalf("bigquery.NewClient: %v", err)
311 }
312 defer client.Close()
313
314
315 var table *bigquery.Table
316 if *reportTable != "" {
317 table, err = prepareTable(ctx, client, *reportTable, *createTable)
318 if err != nil {
319 log.Fatalf("prepareTable: %v", err)
320 }
321 }
322 start := time.Now()
323 profiles, err := runBenchmarks(ctx, client, *queryfile, &tags, *reruns)
324 if err != nil {
325 log.Fatalf("runBenchmarks: %v", err)
326 }
327 fmt.Printf("measurement time: %v\n\n", time.Now().Sub(start))
328 if table != nil {
329 if err := reportResults(ctx, client, table, profiles); err != nil {
330 log.Fatalf("reportResults: %v", err)
331 }
332 }
333 printResults(profiles)
334 }
335
View as plain text