...

Source file src/edge-infra.dev/pkg/f8n/devinfra/gcp/bigquery/bigquery.go

Documentation: edge-infra.dev/pkg/f8n/devinfra/gcp/bigquery

     1  package bigquery
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  
     7  	"cloud.google.com/go/bigquery"
     8  	"google.golang.org/api/iterator"
     9  )
    10  
    11  type Test struct {
    12  	Name        string
    13  	Time        float64
    14  	Failed      bool
    15  	FailureText string `bigquery:"Failure_Text"`
    16  }
    17  
    18  type Metadata struct {
    19  	Key   string
    20  	Value string
    21  }
    22  
    23  type JobData struct {
    24  	Elapsed     float64
    25  	Started     time.Time
    26  	Finished    time.Time
    27  	Version     string
    28  	Path        string
    29  	Job         string
    30  	Number      int
    31  	Metadata    []Metadata
    32  	Test        []Test
    33  	TestsRun    int `bigquery:"Tests_Run"`
    34  	TestsFailed int `bigquery:"Tests_Failed"`
    35  	Passed      bool
    36  	Repos       string
    37  	RepoCommit  string `bigquery:"Repo_Commit"`
    38  }
    39  
    40  const (
    41  	DefaultProject     = "886862789596"
    42  	defaultDataset     = "edgejobsdataset"
    43  	defaultTable       = "edge-jobs-table"
    44  	defaultProjectName = "ret-edge-dev-infra"
    45  )
    46  
    47  type BigQuery struct {
    48  	Client      *bigquery.Client
    49  	projectID   string
    50  	datasetName string
    51  	tableName   string
    52  	queryPath   string
    53  	projectName string
    54  	ctx         context.Context
    55  }
    56  
    57  type Opts func(*BigQuery)
    58  
    59  func New(ctx context.Context, opts ...Opts) (*BigQuery, error) {
    60  	bq := &BigQuery{
    61  		ctx:         ctx,
    62  		projectID:   DefaultProject,
    63  		datasetName: defaultDataset,
    64  		tableName:   defaultTable,
    65  		projectName: defaultProjectName,
    66  	}
    67  
    68  	for _, opt := range opts {
    69  		opt(bq)
    70  	}
    71  
    72  	if bq.Client == nil {
    73  		c, err := bigquery.NewClient(bq.ctx, bq.projectID)
    74  		if err != nil {
    75  			return nil, err
    76  		}
    77  		bq.Client = c
    78  	}
    79  
    80  	return bq, nil
    81  }
    82  
    83  func WithProjectID(pid string) Opts {
    84  	return func(o *BigQuery) {
    85  		o.projectID = pid
    86  	}
    87  }
    88  
    89  func WithDatasetName(dn string) Opts {
    90  	return func(o *BigQuery) {
    91  		o.datasetName = dn
    92  	}
    93  }
    94  
    95  func WithTableName(tn string) Opts {
    96  	return func(o *BigQuery) {
    97  		o.tableName = tn
    98  	}
    99  }
   100  
   101  func WithProjectName(pn string) Opts {
   102  	return func(o *BigQuery) {
   103  		o.projectName = pn
   104  	}
   105  }
   106  
   107  func (bq *BigQuery) initDataset() *bigquery.Dataset {
   108  	return bq.Client.Dataset(bq.datasetName)
   109  }
   110  
   111  func (bq *BigQuery) initTable() *bigquery.Table {
   112  	return bq.initDataset().Table(bq.tableName)
   113  }
   114  
   115  func (bq *BigQuery) Insert(d interface{}) error {
   116  	table := bq.initTable()
   117  	u := table.Inserter()
   118  	return u.Put(bq.ctx, d)
   119  }
   120  
   121  func (bq *BigQuery) CloseClient() error {
   122  	return bq.Client.Close()
   123  }
   124  
   125  // Query ...
   126  func (bq *BigQuery) Query(query string, params []bigquery.QueryParameter) (*bigquery.RowIterator, error) {
   127  	q := bq.Client.Query(query)
   128  	q.DefaultDatasetID = bq.datasetName
   129  	q.DefaultProjectID = bq.projectID
   130  	q.Parameters = params
   131  
   132  	it, err := q.Read(bq.ctx)
   133  	if err != nil {
   134  		return nil, err
   135  	}
   136  	return it, nil
   137  }
   138  
   139  // GetRuns queries for all runs that have the matching repo / workflow / job
   140  func (bq *BigQuery) GetRuns(repo, w, j string) ([]JobData, error) {
   141  	q := bq.Client.Query(
   142  		`SELECT job, repos, started, elapsed, number, passed, metadata, repo_commit
   143  		FROM ` + bq.queryPath + `
   144  		WHERE repos = @repo
   145  		AND job = @job
   146  		ORDER BY started DESC
   147  		LIMIT 100
   148  		`)
   149  	q.Parameters = []bigquery.QueryParameter{
   150  		{
   151  			Name:  "repo",
   152  			Value: repo,
   153  		},
   154  		{
   155  			Name:  "job",
   156  			Value: w + "/" + j,
   157  		},
   158  	}
   159  
   160  	var rows []JobData
   161  
   162  	// Run the query and print results when the query job is completed.
   163  	job, err := q.Run(bq.ctx)
   164  	if err != nil {
   165  		return rows, err
   166  	}
   167  	status, err := job.Wait(bq.ctx)
   168  	if err != nil {
   169  		return rows, err
   170  	}
   171  	if err := status.Err(); err != nil {
   172  		return rows, err
   173  	}
   174  	it, err := job.Read(bq.ctx)
   175  	if err != nil {
   176  		return rows, err
   177  	}
   178  	for {
   179  		var row JobData
   180  		err := it.Next(&row)
   181  		if err == iterator.Done {
   182  			break
   183  		}
   184  		if err != nil {
   185  			return rows, err
   186  		}
   187  		if row.Repos != "" {
   188  			rows = append(rows, row)
   189  		}
   190  	}
   191  	return rows, nil
   192  }
   193  
   194  func (bq *BigQuery) GetRecentJobs() ([]JobData, error) {
   195  	q := bq.Client.Query(
   196  		`SELECT job, repos, started, finished, elapsed, tests_failed, tests_run, number
   197  		FROM ` + bq.queryPath + `
   198  		WHERE started BETWEEN TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -24 HOUR)
   199  		AND CURRENT_TIMESTAMP()
   200  		ORDER BY started desc`)
   201  	var rows []JobData
   202  
   203  	it, err := q.Read(bq.ctx)
   204  	if err != nil {
   205  		return rows, err
   206  	}
   207  	for {
   208  		var row JobData
   209  		err := it.Next(&row)
   210  		if err == iterator.Done {
   211  			break
   212  		}
   213  		if err != nil {
   214  			return rows, err
   215  		}
   216  		if row.Job != "" {
   217  			rows = append(rows, row)
   218  		}
   219  	}
   220  	return rows, nil
   221  }
   222  
   223  // GetRepos returns all the unique combinations of repos and jobs(workflow/job)
   224  func (bq *BigQuery) GetRepos() ([]JobData, error) {
   225  	q := bq.Client.Query(
   226  		`SELECT job
   227  		FROM ` + bq.queryPath + `
   228  		GROUP BY job`) // may need to be group each by eventually
   229  
   230  	var rows []JobData
   231  
   232  	it, err := q.Read(bq.ctx)
   233  	if err != nil {
   234  		return rows, err
   235  	}
   236  	for {
   237  		var row JobData
   238  		err := it.Next(&row)
   239  		if err == iterator.Done {
   240  			break
   241  		}
   242  		if err != nil {
   243  			return rows, err
   244  		}
   245  		if row.Job != "" {
   246  			rows = append(rows, row)
   247  		}
   248  	}
   249  	return rows, nil
   250  }
   251  
   252  // GetJob find all the results for repo / worflow / job name / run id
   253  func (bq *BigQuery) GetJob(repo, w, j string, r int) ([]JobData, error) {
   254  	q := bq.Client.Query(
   255  		`SELECT *
   256  		FROM ` + bq.queryPath + `
   257  		WHERE repos = @repo
   258  		AND job = @job
   259  		AND number = @run`)
   260  	q.Parameters = []bigquery.QueryParameter{
   261  		{
   262  			Name:  "repo",
   263  			Value: repo,
   264  		},
   265  		{
   266  			Name:  "job",
   267  			Value: w + "/" + j,
   268  		}, {
   269  			Name:  "run",
   270  			Value: r,
   271  		},
   272  	}
   273  
   274  	var rows []JobData
   275  
   276  	// Run the query and print results when the query job is completed.
   277  	job, err := q.Run(bq.ctx)
   278  	if err != nil {
   279  		return rows, err
   280  	}
   281  
   282  	status, err := job.Wait(bq.ctx)
   283  	if err != nil {
   284  		return rows, err
   285  	}
   286  	if err := status.Err(); err != nil {
   287  		return rows, err
   288  	}
   289  
   290  	it, err := job.Read(bq.ctx)
   291  	if err != nil {
   292  		return rows, err
   293  	}
   294  	for {
   295  		var row JobData
   296  		err := it.Next(&row)
   297  		if err == iterator.Done {
   298  			break
   299  		}
   300  		if err != nil {
   301  			return rows, err
   302  		}
   303  		rows = append(rows, row)
   304  	}
   305  	return rows, nil
   306  }
   307  
   308  // GetJob find all the results for repo / worflow / job name / run id
   309  func (bq *BigQuery) GetAllPRJobs(repo, pr string) ([]JobData, error) {
   310  	q := bq.Client.Query(
   311  		`SELECT *
   312  		FROM ` + bq.queryPath + `,UNNEST(metadata) AS meta
   313  		WHERE repos = @repo
   314  		AND meta.key = @pull
   315  		AND meta.value= @pr`)
   316  	// SELECT * FROM `ret-edge-dev-infra.edgejobsdataset.edge-jobs-table`,UNNEST(metadata) AS meta
   317  	// WHERE repos="edge-infra" AND meta.key="pull" AND meta.value="2149"
   318  
   319  	q.Parameters = []bigquery.QueryParameter{
   320  		{
   321  			Name:  "repo",
   322  			Value: repo,
   323  		}, {
   324  			Name:  "pull",
   325  			Value: "pull",
   326  		},
   327  		{
   328  			Name:  "pr",
   329  			Value: pr,
   330  		},
   331  	}
   332  
   333  	var rows []JobData
   334  
   335  	// Run the query and print results when the query job is completed.
   336  	job, err := q.Run(bq.ctx)
   337  	if err != nil {
   338  		return rows, err
   339  	}
   340  
   341  	status, err := job.Wait(bq.ctx)
   342  	if err != nil {
   343  		return rows, err
   344  	}
   345  	if err := status.Err(); err != nil {
   346  		return rows, err
   347  	}
   348  
   349  	it, err := job.Read(bq.ctx)
   350  	if err != nil {
   351  		return rows, err
   352  	}
   353  	for {
   354  		var row JobData
   355  		err := it.Next(&row)
   356  		if err == iterator.Done {
   357  			break
   358  		}
   359  		if err != nil {
   360  			return rows, err
   361  		}
   362  		rows = append(rows, row)
   363  	}
   364  	return rows, nil
   365  }
   366  

View as plain text