...

Source file src/cloud.google.com/go/bigquery/benchmarks/bench.go

Documentation: cloud.google.com/go/bigquery/benchmarks

     1  // Copyright 2017 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  //go:build ignore
    16  // +build ignore
    17  
    18  package main
    19  
    20  import (
    21  	"bytes"
    22  	"context"
    23  	"encoding/json"
    24  	"flag"
    25  	"fmt"
    26  	"io/ioutil"
    27  	"log"
    28  	"os"
    29  	"strings"
    30  	"time"
    31  
    32  	"cloud.google.com/go/bigquery"
    33  	"google.golang.org/api/iterator"
    34  )
    35  
    36  // profileTag is a simple annotation for benchmark runs.
    37  type profileTag struct {
    38  	Key   string `json:"key,omitempty" bigquery:"key"`
    39  	Value string `json:"value,omitempty" bigquery:"value"`
    40  }
    41  
    42  type tags []*profileTag
    43  
    44  func (ts *tags) String() string {
    45  	var s strings.Builder
    46  	fp := len(*ts)
    47  	for i, t := range *ts {
    48  		s.WriteString(fmt.Sprintf("%s:%s", t.Key, t.Value))
    49  		if i < fp-1 {
    50  			s.WriteString(",")
    51  		}
    52  	}
    53  	return s.String()
    54  }
    55  
    56  func (ts *tags) Set(value string) error {
    57  	if value == "" {
    58  		return nil
    59  	}
    60  	parts := strings.SplitN(value, ":", 2)
    61  	if len(parts) == 2 {
    62  		// both a key and value
    63  		*ts = append(*ts, &profileTag{Key: parts[0], Value: parts[1]})
    64  	} else {
    65  		*ts = append(*ts, &profileTag{Key: value})
    66  	}
    67  	return nil
    68  }
    69  
    70  // AsSlice is used to simplify schema inference.
    71  func (ts *tags) AsSlice() []*profileTag {
    72  	var out []*profileTag
    73  	for _, v := range *ts {
    74  		out = append(out, v)
    75  	}
    76  	return out
    77  }
    78  
    79  // profiledQuery provides metadata about query invocations and performance.
    80  type profiledQuery struct {
    81  	// Used to describe a set of related queries.
    82  	GroupName string `json:"groupname" bigquery:"groupname"`
    83  	// User to describe a single query configuration.
    84  	Name string `json:"name" bigquery:"name"`
    85  	// Tags allow an arbitrary list of KV pairs for denoting specifics of a profile.
    86  	Tags []*profileTag `json:"tags" bigquery:"tags"`
    87  	// Persisted query configuration.
    88  	Query *bigquery.Query `json:"-" bigquery:"-"`
    89  	// Just the query string.
    90  	SQL string
    91  	// Timing details from multiple invocations.
    92  	Runs []*timingInfo `json:"runs" bigquery:"runs"`
    93  	// When this data was logged.
    94  	EventTime time.Time `json:"event_time" bigquery:"event_time"`
    95  }
    96  
    97  // timingInfo provides measurements for a single invocation of a query.
    98  type timingInfo struct {
    99  	// If the query failed in error, this retains a copy of the error string
   100  	ErrorString string `json:"errorstring,omitempty" bigquery:"errorstring"`
   101  	// Start time from the client perspective, e.q. calling Read() to insert and wait for an iterator
   102  	StartTime time.Time `json:"start_time,omitempty" bigquery:"start_time"`
   103  	// Measured when the Read() call returns.
   104  	QueryEndTime time.Time `json:"query_end_time,omitempty" bigquery:"query_end_time"`
   105  	// Measured when consumer receives the first row via the iterator.
   106  	FirstRowReturnedTime time.Time `json:"first_row_returned_time,omitempty" bigquery:"first_row_returned_time"`
   107  	// Measured when consumer receives iterator.Done
   108  	AllRowsReturnedTime time.Time `json:"all_rows_returned_time,omitempty" bigquery:"all_rows_returned_time"`
   109  	// Number of rows fetched through the iterator.
   110  	TotalRows int64 `json:"total_rows,omitempty" bigquery:"total_rows"`
   111  }
   112  
   113  // Summary provides a human-readable string that summarizes the significant timing details.
   114  func (t *timingInfo) Summary() string {
   115  	noVal := "NODATA"
   116  	var buf bytes.Buffer
   117  	fmt.Fprintf(&buf, "QUERYTIME ")
   118  	if !t.QueryEndTime.IsZero() {
   119  		fmt.Fprintf(&buf, "%v", t.QueryEndTime.Sub(t.StartTime))
   120  	} else {
   121  		fmt.Fprintf(&buf, noVal)
   122  	}
   123  
   124  	fmt.Fprintf(&buf, " FIRSTROW ")
   125  	if !t.FirstRowReturnedTime.IsZero() {
   126  		fmt.Fprintf(&buf, "%v (+%v)", t.FirstRowReturnedTime.Sub(t.StartTime), t.FirstRowReturnedTime.Sub(t.QueryEndTime))
   127  	} else {
   128  		fmt.Fprintf(&buf, noVal)
   129  	}
   130  
   131  	fmt.Fprintf(&buf, " ALLROWS ")
   132  	if !t.AllRowsReturnedTime.IsZero() {
   133  		fmt.Fprintf(&buf, "%v (+%v)", t.AllRowsReturnedTime.Sub(t.StartTime), t.AllRowsReturnedTime.Sub(t.FirstRowReturnedTime))
   134  	} else {
   135  		fmt.Fprintf(&buf, noVal)
   136  	}
   137  	if t.TotalRows > 0 {
   138  		fmt.Fprintf(&buf, " ROWS %d", t.TotalRows)
   139  	}
   140  	if t.ErrorString != "" {
   141  		fmt.Fprintf(&buf, " ERRORED %s ", t.ErrorString)
   142  	}
   143  	return buf.String()
   144  }
   145  
   146  // measureSelectQuery invokes a query given a config and returns timing information.
   147  //
   148  // This instrumentation is meant for the common query case.
   149  func measureSelectQuery(ctx context.Context, q *bigquery.Query) *timingInfo {
   150  	timing := &timingInfo{
   151  		StartTime: time.Now(),
   152  	}
   153  	it, err := q.Read(ctx)
   154  	timing.QueryEndTime = time.Now()
   155  	if err != nil {
   156  		timing.ErrorString = err.Error()
   157  		return timing
   158  	}
   159  	var row []bigquery.Value
   160  	var rowCount int64
   161  	for {
   162  		err := it.Next(&row)
   163  		if rowCount == 0 {
   164  			timing.FirstRowReturnedTime = time.Now()
   165  		}
   166  		if err == iterator.Done {
   167  			timing.AllRowsReturnedTime = time.Now()
   168  			timing.TotalRows = rowCount
   169  			break
   170  		}
   171  		if err != nil {
   172  			timing.ErrorString = err.Error()
   173  			return timing
   174  		}
   175  		rowCount++
   176  	}
   177  	return timing
   178  }
   179  
   180  // runBenchmarks processes the input file and instruments the queries.
   181  // It currently instruments queries serially to reduce variance due to concurrent execution on either the backend or in this client.
   182  func runBenchmarks(ctx context.Context, client *bigquery.Client, filename string, tags *tags, reruns int) (profiles []*profiledQuery, err error) {
   183  
   184  	queriesJSON, err := ioutil.ReadFile(filename)
   185  	if err != nil {
   186  		return nil, fmt.Errorf("failed to read queries files: %v", err)
   187  	}
   188  
   189  	var benchmarkInput map[string]map[string]string
   190  	if err := json.Unmarshal(queriesJSON, &benchmarkInput); err != nil {
   191  		return nil, fmt.Errorf("failed to unmarshall queries data: %v", err)
   192  	}
   193  
   194  	convertedTags := tags.AsSlice()
   195  
   196  	for groupName, m := range benchmarkInput {
   197  		for id, sql := range m {
   198  			prof := &profiledQuery{
   199  				GroupName: groupName,
   200  				Name:      id,
   201  				SQL:       sql,
   202  				Tags:      convertedTags,
   203  				EventTime: time.Now(),
   204  			}
   205  			fmt.Printf("Measuring %s : %s", groupName, id)
   206  			query := client.Query(sql)
   207  			prof.Query = query
   208  
   209  			for i := 0; i < reruns; i++ {
   210  				fmt.Printf(".")
   211  				prof.Runs = append(prof.Runs, measureSelectQuery(ctx, query))
   212  			}
   213  			fmt.Println()
   214  			profiles = append(profiles, prof)
   215  		}
   216  	}
   217  	fmt.Println()
   218  	return profiles, nil
   219  }
   220  
   221  // printResults prints information about collected query profiles.
   222  func printResults(queries []*profiledQuery) {
   223  	for i, prof := range queries {
   224  		fmt.Printf("%d: (%s:%s)\n", i, prof.GroupName, prof.Name)
   225  		fmt.Printf("SQL: %s\n", prof.Query.Q)
   226  		fmt.Printf("MEASUREMENTS\n")
   227  		for j, timing := range prof.Runs {
   228  			fmt.Printf("\t\t(%d) %s\n", j, timing.Summary())
   229  		}
   230  		fmt.Println()
   231  	}
   232  }
   233  
   234  // prepareTable ensures a table exists, and optionally creates it if directed
   235  func prepareTable(ctx context.Context, client *bigquery.Client, table string, create bool) (*bigquery.Table, error) {
   236  	// Ensure table exists before streaming results, and possibly create it if directed.
   237  	parts := strings.Split(table, ".")
   238  	if len(parts) != 3 {
   239  		return nil, fmt.Errorf("Expected table in p.d.t format, got: %s", table)
   240  	}
   241  	tRef := client.DatasetInProject(parts[0], parts[1]).Table(parts[2])
   242  	// check with backend
   243  	_, err := tRef.Metadata(ctx)
   244  	if err != nil {
   245  		if create {
   246  			schema, err := bigquery.InferSchema(profiledQuery{})
   247  			if err != nil {
   248  				return nil, fmt.Errorf("could not infer schema while creating table: %v", err)
   249  			}
   250  			createMeta := &bigquery.TableMetadata{
   251  				Schema: schema.Relax(),
   252  				TimePartitioning: &bigquery.TimePartitioning{
   253  					Type:  bigquery.DayPartitioningType,
   254  					Field: "event_time",
   255  				},
   256  				Clustering: &bigquery.Clustering{
   257  					Fields: []string{"groupname", "name"},
   258  				},
   259  			}
   260  			if err2 := tRef.Create(ctx, createMeta); err2 != nil {
   261  				return nil, fmt.Errorf("could not create table: %v", err2)
   262  			}
   263  			return tRef, nil
   264  		}
   265  		return nil, fmt.Errorf("error while validating table existence: %v", err)
   266  	}
   267  	return tRef, nil
   268  }
   269  
   270  // reportResults streams results into the designated table.
   271  func reportResults(ctx context.Context, client *bigquery.Client, table *bigquery.Table, results []*profiledQuery) error {
   272  	inserter := table.Inserter()
   273  
   274  	// Set a timeout on our context to bound retries
   275  	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
   276  	defer cancel()
   277  	if err := inserter.Put(ctx, results); err != nil {
   278  		return fmt.Errorf("reportResults: %v", err)
   279  	}
   280  	return nil
   281  }
   282  
   283  func main() {
   284  	var reruns = flag.Int("reruns", 3, "number of reruns to issue for each query")
   285  	var queryfile = flag.String("queryfile", "benchmarked-queries.json", "path to file contain queries to be benchmarked.")
   286  	var projectID = flag.String("projectid", "", "project ID to use for running benchmarks.  Uses GOOGLE_CLOUD_PROJECT env if not set.")
   287  	var reportTable = flag.String("table", "", "table to stream results into, specified in project.dataset.table format")
   288  	var createTable = flag.Bool("create_table", false, "create result table if it does not exist")
   289  
   290  	var tags tags
   291  	flag.Var(&tags, "tag", "an optional key and value seperated by colon (:) character")
   292  	flag.Parse()
   293  
   294  	// Validate flags.
   295  	if *reruns <= 0 {
   296  		log.Fatalf("--reruns should be a positive value")
   297  	}
   298  	projID := os.Getenv("GOOGLE_CLOUD_PROJECT")
   299  	if *projectID != "" {
   300  		projID = *projectID
   301  	}
   302  	if projID == "" {
   303  		log.Fatalf("must provide --projectid or set GOOGLE_CLOUD_PROJECT environment variable")
   304  	}
   305  
   306  	// Setup context and client based on ADC.
   307  	ctx := context.Background()
   308  	client, err := bigquery.NewClient(ctx, projID)
   309  	if err != nil {
   310  		log.Fatalf("bigquery.NewClient: %v", err)
   311  	}
   312  	defer client.Close()
   313  
   314  	// If we're going to stream results, let's make sure we can do that before running all the tests.
   315  	var table *bigquery.Table
   316  	if *reportTable != "" {
   317  		table, err = prepareTable(ctx, client, *reportTable, *createTable)
   318  		if err != nil {
   319  			log.Fatalf("prepareTable: %v", err)
   320  		}
   321  	}
   322  	start := time.Now()
   323  	profiles, err := runBenchmarks(ctx, client, *queryfile, &tags, *reruns)
   324  	if err != nil {
   325  		log.Fatalf("runBenchmarks: %v", err)
   326  	}
   327  	fmt.Printf("measurement time: %v\n\n", time.Now().Sub(start))
   328  	if table != nil {
   329  		if err := reportResults(ctx, client, table, profiles); err != nil {
   330  			log.Fatalf("reportResults: %v", err)
   331  		}
   332  	}
   333  	printResults(profiles)
   334  }
   335  

View as plain text