package bigquery import ( "context" "time" "cloud.google.com/go/bigquery" "google.golang.org/api/iterator" ) type Test struct { Name string Time float64 Failed bool FailureText string `bigquery:"Failure_Text"` } type Metadata struct { Key string Value string } type JobData struct { Elapsed float64 Started time.Time Finished time.Time Version string Path string Job string Number int Metadata []Metadata Test []Test TestsRun int `bigquery:"Tests_Run"` TestsFailed int `bigquery:"Tests_Failed"` Passed bool Repos string RepoCommit string `bigquery:"Repo_Commit"` } const ( DefaultProject = "886862789596" defaultDataset = "edgejobsdataset" defaultTable = "edge-jobs-table" defaultProjectName = "ret-edge-dev-infra" ) type BigQuery struct { Client *bigquery.Client projectID string datasetName string tableName string queryPath string projectName string ctx context.Context } type Opts func(*BigQuery) func New(ctx context.Context, opts ...Opts) (*BigQuery, error) { bq := &BigQuery{ ctx: ctx, projectID: DefaultProject, datasetName: defaultDataset, tableName: defaultTable, projectName: defaultProjectName, } for _, opt := range opts { opt(bq) } if bq.Client == nil { c, err := bigquery.NewClient(bq.ctx, bq.projectID) if err != nil { return nil, err } bq.Client = c } return bq, nil } func WithProjectID(pid string) Opts { return func(o *BigQuery) { o.projectID = pid } } func WithDatasetName(dn string) Opts { return func(o *BigQuery) { o.datasetName = dn } } func WithTableName(tn string) Opts { return func(o *BigQuery) { o.tableName = tn } } func WithProjectName(pn string) Opts { return func(o *BigQuery) { o.projectName = pn } } func (bq *BigQuery) initDataset() *bigquery.Dataset { return bq.Client.Dataset(bq.datasetName) } func (bq *BigQuery) initTable() *bigquery.Table { return bq.initDataset().Table(bq.tableName) } func (bq *BigQuery) Insert(d interface{}) error { table := bq.initTable() u := table.Inserter() return u.Put(bq.ctx, d) } func (bq *BigQuery) CloseClient() error { return bq.Client.Close() } // Query ... func (bq *BigQuery) Query(query string, params []bigquery.QueryParameter) (*bigquery.RowIterator, error) { q := bq.Client.Query(query) q.DefaultDatasetID = bq.datasetName q.DefaultProjectID = bq.projectID q.Parameters = params it, err := q.Read(bq.ctx) if err != nil { return nil, err } return it, nil } // GetRuns queries for all runs that have the matching repo / workflow / job func (bq *BigQuery) GetRuns(repo, w, j string) ([]JobData, error) { q := bq.Client.Query( `SELECT job, repos, started, elapsed, number, passed, metadata, repo_commit FROM ` + bq.queryPath + ` WHERE repos = @repo AND job = @job ORDER BY started DESC LIMIT 100 `) q.Parameters = []bigquery.QueryParameter{ { Name: "repo", Value: repo, }, { Name: "job", Value: w + "/" + j, }, } var rows []JobData // Run the query and print results when the query job is completed. job, err := q.Run(bq.ctx) if err != nil { return rows, err } status, err := job.Wait(bq.ctx) if err != nil { return rows, err } if err := status.Err(); err != nil { return rows, err } it, err := job.Read(bq.ctx) if err != nil { return rows, err } for { var row JobData err := it.Next(&row) if err == iterator.Done { break } if err != nil { return rows, err } if row.Repos != "" { rows = append(rows, row) } } return rows, nil } func (bq *BigQuery) GetRecentJobs() ([]JobData, error) { q := bq.Client.Query( `SELECT job, repos, started, finished, elapsed, tests_failed, tests_run, number FROM ` + bq.queryPath + ` WHERE started BETWEEN TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -24 HOUR) AND CURRENT_TIMESTAMP() ORDER BY started desc`) var rows []JobData it, err := q.Read(bq.ctx) if err != nil { return rows, err } for { var row JobData err := it.Next(&row) if err == iterator.Done { break } if err != nil { return rows, err } if row.Job != "" { rows = append(rows, row) } } return rows, nil } // GetRepos returns all the unique combinations of repos and jobs(workflow/job) func (bq *BigQuery) GetRepos() ([]JobData, error) { q := bq.Client.Query( `SELECT job FROM ` + bq.queryPath + ` GROUP BY job`) // may need to be group each by eventually var rows []JobData it, err := q.Read(bq.ctx) if err != nil { return rows, err } for { var row JobData err := it.Next(&row) if err == iterator.Done { break } if err != nil { return rows, err } if row.Job != "" { rows = append(rows, row) } } return rows, nil } // GetJob find all the results for repo / worflow / job name / run id func (bq *BigQuery) GetJob(repo, w, j string, r int) ([]JobData, error) { q := bq.Client.Query( `SELECT * FROM ` + bq.queryPath + ` WHERE repos = @repo AND job = @job AND number = @run`) q.Parameters = []bigquery.QueryParameter{ { Name: "repo", Value: repo, }, { Name: "job", Value: w + "/" + j, }, { Name: "run", Value: r, }, } var rows []JobData // Run the query and print results when the query job is completed. job, err := q.Run(bq.ctx) if err != nil { return rows, err } status, err := job.Wait(bq.ctx) if err != nil { return rows, err } if err := status.Err(); err != nil { return rows, err } it, err := job.Read(bq.ctx) if err != nil { return rows, err } for { var row JobData err := it.Next(&row) if err == iterator.Done { break } if err != nil { return rows, err } rows = append(rows, row) } return rows, nil } // GetJob find all the results for repo / worflow / job name / run id func (bq *BigQuery) GetAllPRJobs(repo, pr string) ([]JobData, error) { q := bq.Client.Query( `SELECT * FROM ` + bq.queryPath + `,UNNEST(metadata) AS meta WHERE repos = @repo AND meta.key = @pull AND meta.value= @pr`) // SELECT * FROM `ret-edge-dev-infra.edgejobsdataset.edge-jobs-table`,UNNEST(metadata) AS meta // WHERE repos="edge-infra" AND meta.key="pull" AND meta.value="2149" q.Parameters = []bigquery.QueryParameter{ { Name: "repo", Value: repo, }, { Name: "pull", Value: "pull", }, { Name: "pr", Value: pr, }, } var rows []JobData // Run the query and print results when the query job is completed. job, err := q.Run(bq.ctx) if err != nil { return rows, err } status, err := job.Wait(bq.ctx) if err != nil { return rows, err } if err := status.Err(); err != nil { return rows, err } it, err := job.Read(bq.ctx) if err != nil { return rows, err } for { var row JobData err := it.Next(&row) if err == iterator.Done { break } if err != nil { return rows, err } rows = append(rows, row) } return rows, nil }