1 package bigquery
2
3 import (
4 "context"
5 "time"
6
7 "cloud.google.com/go/bigquery"
8 "google.golang.org/api/iterator"
9 )
10
11 type Test struct {
12 Name string
13 Time float64
14 Failed bool
15 FailureText string `bigquery:"Failure_Text"`
16 }
17
18 type Metadata struct {
19 Key string
20 Value string
21 }
22
23 type JobData struct {
24 Elapsed float64
25 Started time.Time
26 Finished time.Time
27 Version string
28 Path string
29 Job string
30 Number int
31 Metadata []Metadata
32 Test []Test
33 TestsRun int `bigquery:"Tests_Run"`
34 TestsFailed int `bigquery:"Tests_Failed"`
35 Passed bool
36 Repos string
37 RepoCommit string `bigquery:"Repo_Commit"`
38 }
39
40 const (
41 DefaultProject = "886862789596"
42 defaultDataset = "edgejobsdataset"
43 defaultTable = "edge-jobs-table"
44 defaultProjectName = "ret-edge-dev-infra"
45 )
46
47 type BigQuery struct {
48 Client *bigquery.Client
49 projectID string
50 datasetName string
51 tableName string
52 queryPath string
53 projectName string
54 ctx context.Context
55 }
56
57 type Opts func(*BigQuery)
58
59 func New(ctx context.Context, opts ...Opts) (*BigQuery, error) {
60 bq := &BigQuery{
61 ctx: ctx,
62 projectID: DefaultProject,
63 datasetName: defaultDataset,
64 tableName: defaultTable,
65 projectName: defaultProjectName,
66 }
67
68 for _, opt := range opts {
69 opt(bq)
70 }
71
72 if bq.Client == nil {
73 c, err := bigquery.NewClient(bq.ctx, bq.projectID)
74 if err != nil {
75 return nil, err
76 }
77 bq.Client = c
78 }
79
80 return bq, nil
81 }
82
83 func WithProjectID(pid string) Opts {
84 return func(o *BigQuery) {
85 o.projectID = pid
86 }
87 }
88
89 func WithDatasetName(dn string) Opts {
90 return func(o *BigQuery) {
91 o.datasetName = dn
92 }
93 }
94
95 func WithTableName(tn string) Opts {
96 return func(o *BigQuery) {
97 o.tableName = tn
98 }
99 }
100
101 func WithProjectName(pn string) Opts {
102 return func(o *BigQuery) {
103 o.projectName = pn
104 }
105 }
106
107 func (bq *BigQuery) initDataset() *bigquery.Dataset {
108 return bq.Client.Dataset(bq.datasetName)
109 }
110
111 func (bq *BigQuery) initTable() *bigquery.Table {
112 return bq.initDataset().Table(bq.tableName)
113 }
114
115 func (bq *BigQuery) Insert(d interface{}) error {
116 table := bq.initTable()
117 u := table.Inserter()
118 return u.Put(bq.ctx, d)
119 }
120
121 func (bq *BigQuery) CloseClient() error {
122 return bq.Client.Close()
123 }
124
125
126 func (bq *BigQuery) Query(query string, params []bigquery.QueryParameter) (*bigquery.RowIterator, error) {
127 q := bq.Client.Query(query)
128 q.DefaultDatasetID = bq.datasetName
129 q.DefaultProjectID = bq.projectID
130 q.Parameters = params
131
132 it, err := q.Read(bq.ctx)
133 if err != nil {
134 return nil, err
135 }
136 return it, nil
137 }
138
139
140 func (bq *BigQuery) GetRuns(repo, w, j string) ([]JobData, error) {
141 q := bq.Client.Query(
142 `SELECT job, repos, started, elapsed, number, passed, metadata, repo_commit
143 FROM ` + bq.queryPath + `
144 WHERE repos = @repo
145 AND job = @job
146 ORDER BY started DESC
147 LIMIT 100
148 `)
149 q.Parameters = []bigquery.QueryParameter{
150 {
151 Name: "repo",
152 Value: repo,
153 },
154 {
155 Name: "job",
156 Value: w + "/" + j,
157 },
158 }
159
160 var rows []JobData
161
162
163 job, err := q.Run(bq.ctx)
164 if err != nil {
165 return rows, err
166 }
167 status, err := job.Wait(bq.ctx)
168 if err != nil {
169 return rows, err
170 }
171 if err := status.Err(); err != nil {
172 return rows, err
173 }
174 it, err := job.Read(bq.ctx)
175 if err != nil {
176 return rows, err
177 }
178 for {
179 var row JobData
180 err := it.Next(&row)
181 if err == iterator.Done {
182 break
183 }
184 if err != nil {
185 return rows, err
186 }
187 if row.Repos != "" {
188 rows = append(rows, row)
189 }
190 }
191 return rows, nil
192 }
193
194 func (bq *BigQuery) GetRecentJobs() ([]JobData, error) {
195 q := bq.Client.Query(
196 `SELECT job, repos, started, finished, elapsed, tests_failed, tests_run, number
197 FROM ` + bq.queryPath + `
198 WHERE started BETWEEN TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -24 HOUR)
199 AND CURRENT_TIMESTAMP()
200 ORDER BY started desc`)
201 var rows []JobData
202
203 it, err := q.Read(bq.ctx)
204 if err != nil {
205 return rows, err
206 }
207 for {
208 var row JobData
209 err := it.Next(&row)
210 if err == iterator.Done {
211 break
212 }
213 if err != nil {
214 return rows, err
215 }
216 if row.Job != "" {
217 rows = append(rows, row)
218 }
219 }
220 return rows, nil
221 }
222
223
224 func (bq *BigQuery) GetRepos() ([]JobData, error) {
225 q := bq.Client.Query(
226 `SELECT job
227 FROM ` + bq.queryPath + `
228 GROUP BY job`)
229
230 var rows []JobData
231
232 it, err := q.Read(bq.ctx)
233 if err != nil {
234 return rows, err
235 }
236 for {
237 var row JobData
238 err := it.Next(&row)
239 if err == iterator.Done {
240 break
241 }
242 if err != nil {
243 return rows, err
244 }
245 if row.Job != "" {
246 rows = append(rows, row)
247 }
248 }
249 return rows, nil
250 }
251
252
253 func (bq *BigQuery) GetJob(repo, w, j string, r int) ([]JobData, error) {
254 q := bq.Client.Query(
255 `SELECT *
256 FROM ` + bq.queryPath + `
257 WHERE repos = @repo
258 AND job = @job
259 AND number = @run`)
260 q.Parameters = []bigquery.QueryParameter{
261 {
262 Name: "repo",
263 Value: repo,
264 },
265 {
266 Name: "job",
267 Value: w + "/" + j,
268 }, {
269 Name: "run",
270 Value: r,
271 },
272 }
273
274 var rows []JobData
275
276
277 job, err := q.Run(bq.ctx)
278 if err != nil {
279 return rows, err
280 }
281
282 status, err := job.Wait(bq.ctx)
283 if err != nil {
284 return rows, err
285 }
286 if err := status.Err(); err != nil {
287 return rows, err
288 }
289
290 it, err := job.Read(bq.ctx)
291 if err != nil {
292 return rows, err
293 }
294 for {
295 var row JobData
296 err := it.Next(&row)
297 if err == iterator.Done {
298 break
299 }
300 if err != nil {
301 return rows, err
302 }
303 rows = append(rows, row)
304 }
305 return rows, nil
306 }
307
308
309 func (bq *BigQuery) GetAllPRJobs(repo, pr string) ([]JobData, error) {
310 q := bq.Client.Query(
311 `SELECT *
312 FROM ` + bq.queryPath + `,UNNEST(metadata) AS meta
313 WHERE repos = @repo
314 AND meta.key = @pull
315 AND meta.value= @pr`)
316
317
318
319 q.Parameters = []bigquery.QueryParameter{
320 {
321 Name: "repo",
322 Value: repo,
323 }, {
324 Name: "pull",
325 Value: "pull",
326 },
327 {
328 Name: "pr",
329 Value: pr,
330 },
331 }
332
333 var rows []JobData
334
335
336 job, err := q.Run(bq.ctx)
337 if err != nil {
338 return rows, err
339 }
340
341 status, err := job.Wait(bq.ctx)
342 if err != nil {
343 return rows, err
344 }
345 if err := status.Err(); err != nil {
346 return rows, err
347 }
348
349 it, err := job.Read(bq.ctx)
350 if err != nil {
351 return rows, err
352 }
353 for {
354 var row JobData
355 err := it.Next(&row)
356 if err == iterator.Done {
357 break
358 }
359 if err != nil {
360 return rows, err
361 }
362 rows = append(rows, row)
363 }
364 return rows, nil
365 }
366
View as plain text