1 package sql
2
3 import (
4 "context"
5 "database/sql"
6 "fmt"
7 "net/url"
8 "os"
9 "strconv"
10 "strings"
11 "time"
12
13 "github.com/google/uuid"
14 _ "github.com/jackc/pgx/v4/stdlib"
15
16 "edge-infra.dev/pkg/f8n/devinfra/testinfra/model"
17 "edge-infra.dev/pkg/f8n/devinfra/testinfra/sql/schema"
18 )
19
20 var ErrDuplicate = fmt.Errorf("Value already exists in table")
21
22 type DBHandle struct {
23 *sql.DB
24 }
25
26 const (
27 postgres = "postgres"
28 gcpBucketLink = "https://console.cloud.google.com/storage/browser/edge-test-jobs/"
29 )
30
31 func FromEnv() (*DBHandle, error) {
32 user, ok := os.LookupEnv("DB_USER")
33 if !ok {
34 user = postgres
35 }
36 password, ok := os.LookupEnv("DB_PASS")
37 if !ok {
38 password = ""
39 }
40 host, ok := os.LookupEnv("DB_HOST")
41 if !ok {
42 host = "127.0.0.1"
43 }
44 port, ok := os.LookupEnv("DB_PORT")
45 if !ok {
46 port = "5432"
47 }
48 dbName, ok := os.LookupEnv("DB_NAME")
49 if !ok {
50 dbName = postgres
51 }
52
53 var dbMaxConns int
54 dbMaxConnsStr, ok := os.LookupEnv("DB_MAX_CONNS")
55 if ok {
56 dbMaxConnsParsed, err := strconv.Atoi(dbMaxConnsStr)
57 if err != nil {
58 return nil, fmt.Errorf("failed to parse DB_MAX_CONNS: %w", err)
59 }
60 dbMaxConns = dbMaxConnsParsed
61 } else {
62
63
64 dbMaxConns = 450
65 }
66
67 dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s", host, port, user, password, dbName)
68 db, err := sql.Open("pgx", dsn)
69 if err != nil {
70 return nil, fmt.Errorf("failed to open db with pgx driver: %w", err)
71 }
72
73 err = db.Ping()
74 if err != nil {
75 return nil, fmt.Errorf("failed to ping database: %w", err)
76 }
77
78 err = execSchema(db)
79 if err != nil {
80 return nil, fmt.Errorf("failed to execute schema: %w", err)
81 }
82
83
84
85 db.SetMaxOpenConns(dbMaxConns)
86 db.SetMaxIdleConns(dbMaxConns)
87 db.SetConnMaxIdleTime(time.Minute)
88
89 return &DBHandle{DB: db}, nil
90 }
91
92 func execSchema(db *sql.DB) error {
93 schemaSQL, err := schema.SQL.ReadFile("schema.sql")
94 if err != nil {
95 return err
96 }
97 _, err = db.Exec(string(schemaSQL))
98 if err != nil {
99 return err
100 }
101 return nil
102 }
103
104 func (db *DBHandle) InsertEdgeJob(ctx context.Context, job model.EdgeJob) (uuid.UUID, error) {
105
106 var exists bool
107 queryJobs := `
108 SELECT
109 EXISTS(
110 SELECT
111 1
112 FROM
113 test_infra.edge_jobs
114 WHERE
115 number=$1
116 );
117 `
118
119 row := db.QueryRowContext(ctx, queryJobs, job.Number)
120 err := row.Scan(
121 &exists,
122 )
123 if err != nil {
124 return uuid.Nil, err
125 }
126
127 if exists {
128 return uuid.Nil, ErrDuplicate
129 }
130
131 query := `
132 INSERT INTO test_infra.edge_jobs(elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit)
133 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
134 RETURNING id;
135 `
136
137 var id uuid.UUID
138 row = db.QueryRowContext(ctx, query,
139 job.Elapsed,
140 job.Started,
141 job.Finished,
142 job.Version,
143 job.Path,
144 job.Job,
145 job.Workflow,
146 job.Number,
147 job.TestsRun,
148 job.TestsFailed,
149 job.Passed,
150 job.Repos,
151 job.RepoCommit)
152 err = row.Scan(&id)
153 if err != nil {
154 return uuid.Nil, err
155 }
156
157 return id, nil
158 }
159
160 func (db *DBHandle) InsertEdgeJobMetadata(ctx context.Context, meta model.EdgeJobMetadata) (uuid.UUID, error) {
161 query := `
162 INSERT INTO test_infra.edge_job_metadatas(edge_job, key, value)
163 VALUES ($1, $2, $3)
164 RETURNING id;
165 `
166 var id uuid.UUID
167 row := db.QueryRowContext(ctx, query, meta.EdgeJob, meta.Key, meta.Value)
168 err := row.Scan(&id)
169 if err != nil {
170 return uuid.Nil, err
171 }
172
173 return id, nil
174 }
175
176 func (db *DBHandle) InsertEdgeJobTest(ctx context.Context, test model.EdgeJobTest) (uuid.UUID, error) {
177 query := `
178 INSERT INTO test_infra.edge_job_tests(edge_job,name,suite,time,failed,failure_text)
179 VALUES ($1, $2, $3, $4, $5, $6)
180 RETURNING id;
181 `
182 var id uuid.UUID
183 row := db.QueryRowContext(ctx, query, test.EdgeJob, test.Name, test.Suite, test.Time, test.Failed, test.FailureText)
184 err := row.Scan(&id)
185 if err != nil {
186 return uuid.Nil, err
187 }
188
189 return id, nil
190 }
191
192
193 func (db *DBHandle) GetTodaysEdgeJobs() ([]model.EdgeJob, error) {
194 query := `SELECT elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit
195 FROM test_infra.edge_jobs AS jobs
196 WHERE jobs.started BETWEEN NOW() - INTERVAL '24 HOURS' AND NOW()
197 ORDER BY jobs.started DESC`
198
199 var tests []model.EdgeJob
200
201 rows, err := db.Query(query)
202 if err != nil {
203 return nil, err
204 }
205
206 for rows.Next() {
207 var test model.EdgeJob
208 err := rows.Scan(
209 &test.Elapsed,
210 &test.Started,
211 &test.Finished,
212 &test.Version,
213 &test.Path,
214 &test.Job,
215 &test.Workflow,
216 &test.Number,
217 &test.TestsRun,
218 &test.TestsFailed,
219 &test.Passed,
220 &test.Repos,
221 &test.RepoCommit,
222 )
223 if err != nil {
224 return nil, err
225 }
226
227 test.ElapsedSeconds = test.Elapsed.Seconds()
228 tests = append(tests, test)
229 }
230
231
232 if len(tests) == 0 {
233 fmt.Println("no jobs found in last 24 hours, getting last 100")
234 return db.GetRecentEdgeJobs(nil)
235 }
236
237 return tests, rows.Err()
238 }
239
240
241
242
243 func (db *DBHandle) GetRecentEdgeJobs(params url.Values) ([]model.EdgeJob, error) {
244
245
246 var metaKey, metaValue, metaJob, id, filter string
247
248
249
250
251 if !params.Has("key") && !params.Has("value") {
252 filter = "AND key='workflow_name'\n"
253 }
254
255 for k := range params {
256 v := fmt.Sprint("'", strings.Join(params[k], "', '"), "'")
257 filter = filter + fmt.Sprintf("AND %s IN (%s)\n", k, v)
258 }
259
260 query := fmt.Sprintf(`SELECT test_infra.edge_jobs.elapsed as elapsed, test_infra.edge_jobs.started as started, test_infra.edge_jobs.finished as finished, test_infra.edge_jobs.version as version, test_infra.edge_jobs.path as path, test_infra.edge_jobs.job as job, test_infra.edge_jobs.workflow as workflow, test_infra.edge_jobs.number as number, test_infra.edge_jobs.tests_run as tests_run, test_infra.edge_jobs.tests_failed as tests_failed, test_infra.edge_jobs.passed as passed, test_infra.edge_jobs.repos as repos, test_infra.edge_jobs.repo_commit as repo_commit, test_infra.edge_job_metadatas.key as key, test_infra.edge_job_metadatas.value as value, test_infra.edge_job_metadatas.edge_job, test_infra.edge_jobs.id
261 FROM test_infra.edge_jobs
262 INNER JOIN test_infra.edge_job_metadatas
263 ON test_infra.edge_jobs.id=test_infra.edge_job_metadatas.edge_job
264 WHERE test_infra.edge_jobs.workflow IN ('dsds-ci-nightly','hourly-slow-l2-test','hourly-stable-l2-test','hourly-serial-l2-test','hack-verify','iam-e2e','presubmit','presubmit-argo','postsubmit')
265 %s
266 ORDER BY test_infra.edge_jobs.started DESC LIMIT 100`, filter)
267
268 var tests []model.EdgeJob
269
270 rows, err := db.Query(query)
271 if err != nil {
272 return nil, err
273 }
274
275 for rows.Next() {
276 var test model.EdgeJob
277 err := rows.Scan(
278 &test.Elapsed,
279 &test.Started,
280 &test.Finished,
281 &test.Version,
282 &test.Path,
283 &test.Job,
284 &test.Workflow,
285 &test.Number,
286 &test.TestsRun,
287 &test.TestsFailed,
288 &test.Passed,
289 &test.Repos,
290 &test.RepoCommit,
291 &metaKey,
292 &metaValue,
293 &metaJob,
294 &id,
295 )
296 if err != nil {
297 return nil, err
298 }
299
300 tagline := fmt.Sprint(test.Repos, "/", test.Job)
301 if test.Job == "" {
302 test.Job = "argo"
303 tagline = fmt.Sprint(test.Job, "/", test.Workflow)
304 }
305 test.URLTagline = tagline
306
307
308 test.URL = fmt.Sprint("/", test.Repos, "/", test.Workflow, "/", test.Job, "/", test.Number)
309 test.ElapsedSeconds = test.Elapsed.Seconds()
310 tests = append(tests, test)
311 }
312
313 return tests, rows.Err()
314 }
315
316
317
318 func (db *DBHandle) GetRecentEdgeJobsByPlatform() (map[string]map[string]model.EdgeJob, error) {
319 query := `select test_infra.edge_jobs.started, test_infra.edge_jobs.number, test_infra.edge_jobs.job, test_infra.edge_jobs.repos, test_infra.edge_jobs.workflow, test_infra.edge_jobs.passed, test_infra.edge_jobs.tests_run, test_infra.edge_jobs.tests_failed, test_infra.edge_job_metadatas.value as platform, test_infra.edge_job_metadatas.key, test_infra.edge_job_metadatas.edge_job, test_infra.edge_jobs.id
320 from test_infra.edge_jobs
321 INNER JOIN test_infra.edge_job_metadatas
322 ON test_infra.edge_jobs.id=test_infra.edge_job_metadatas.edge_job
323 where test_infra.edge_job_metadatas.key = 'clusterVersion'
324 ORDER BY test_infra.edge_jobs.started DESC
325 LIMIT 100;`
326
327
328 var id, k, edgeJob string
329
330
331
332 tests := make(map[string]map[string]model.EdgeJob)
333
334 rows, err := db.Query(query)
335 if err != nil {
336 return nil, err
337 }
338
339 for rows.Next() {
340 var test model.EdgeJob
341 err := rows.Scan(
342 &test.Started,
343 &test.Number,
344 &test.Job,
345 &test.Repos,
346 &test.Workflow,
347 &test.Passed,
348 &test.TestsRun,
349 &test.TestsFailed,
350 &test.Platform,
351 &id,
352 &k,
353 &edgeJob,
354 )
355 if err != nil {
356 return nil, err
357 }
358
359
360 if test.Job == "" {
361 test.Job = "argo"
362 }
363 test.URL = fmt.Sprint("/", test.Repos, "/", test.Workflow, "/", test.Job, "/", test.Number)
364
365
366 if test.Platform != "" {
367 platformInfo := strings.Split(test.Platform, "-")
368 platformVersion := platformInfo[0]
369 platformType := strings.Split(platformInfo[1], ".")
370 platform := fmt.Sprintf("%s-%s", platformType[0], platformVersion)
371 test.Platform = platform
372
373
374 if _, ok := tests[test.Workflow]; !ok {
375 tests[test.Workflow] = make(map[string]model.EdgeJob)
376 }
377
378
379 if _, ok := tests[test.Workflow][platform]; !ok {
380 tests[test.Workflow][platform] = test
381 }
382 }
383 }
384
385 return tests, rows.Err()
386 }
387
388
389 func (db *DBHandle) GetRecentEdgeJobRuns(repo, workflow, job string) ([]model.EdgeJob, error) {
390 query := `SELECT elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit
391 FROM test_infra.edge_jobs AS jobs
392 WHERE repos=$1 and job=$2 and workflow=$3
393 ORDER BY jobs.started DESC
394 LIMIT 100`
395
396 var tests []model.EdgeJob
397
398 rows, err := db.Query(query, repo, job, workflow)
399 if err != nil {
400 return nil, err
401 }
402
403 for rows.Next() {
404 var test model.EdgeJob
405 err := rows.Scan(
406 &test.Elapsed,
407 &test.Started,
408 &test.Finished,
409 &test.Version,
410 &test.Path,
411 &test.Job,
412 &test.Workflow,
413 &test.Number,
414 &test.TestsRun,
415 &test.TestsFailed,
416 &test.Passed,
417 &test.Repos,
418 &test.RepoCommit,
419 )
420 if err != nil {
421 return nil, err
422 }
423
424 test.ElapsedSeconds = test.Elapsed.Seconds()
425
426 tests = append(tests, test)
427 }
428
429 return tests, rows.Err()
430 }
431
432
433
434 func (db *DBHandle) GetEdgeJob(ctx context.Context, number string) (model.EdgeRun, error) {
435
436
437
438
439
440
441
442
443
444
445
446
447 var run model.EdgeRun
448
449 queryJobs := `
450 SELECT
451 id, elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit
452 FROM test_infra.edge_jobs
453 where number=$1;
454 `
455
456 row := db.QueryRowContext(ctx, queryJobs, number)
457 err := row.Scan(
458 &run.EdgeJobID,
459 &run.Job.Elapsed,
460 &run.Job.Started,
461 &run.Job.Finished,
462 &run.Job.Version,
463 &run.Job.Path,
464 &run.Job.Job,
465 &run.Job.Workflow,
466 &run.Job.Number,
467 &run.Job.TestsRun,
468 &run.Job.TestsFailed,
469 &run.Job.Passed,
470 &run.Job.Repos,
471 &run.Job.RepoCommit,
472 )
473 if err != nil {
474 return model.EdgeRun{}, err
475 }
476
477 gcpURL := fmt.Sprint(gcpBucketLink, "actions/", run.Job.Repos, "/", run.Job.Workflow, "/", run.Job.Number, "/", run.Job.Job)
478 if run.Job.Job == "" {
479 gcpURL = fmt.Sprint(gcpBucketLink, "argo/", run.Job.Repos, "/", run.Job.Workflow, "/", run.Job.Number)
480 }
481 run.Job.GCPURL = gcpURL
482
483 run.Job.ElapsedSeconds = run.Job.Elapsed.Seconds()
484
485 run.Metadata, err = db.GetEdgeJobMetadata(run.EdgeJobID)
486 if err != nil {
487 fmt.Println(err)
488 }
489
490 run.Tests, err = db.GetEdgeJobTests(run.EdgeJobID)
491 if err != nil {
492 fmt.Println(err)
493 }
494
495 return run, nil
496 }
497
498
499 func (db *DBHandle) GetEdgeJobMetadata(edgeJobID uuid.UUID) ([]model.EdgeJobMetadata, error) {
500 queryMetas := `
501 SELECT
502 edge_job, key, value
503 FROM test_infra.edge_job_metadatas
504 WHERE edge_job=$1;
505 `
506
507 metas := []model.EdgeJobMetadata{}
508
509 rows, err := db.Query(queryMetas, edgeJobID.String())
510 if err != nil {
511 return []model.EdgeJobMetadata{}, err
512 }
513 for rows.Next() {
514 var meta model.EdgeJobMetadata
515
516 err := rows.Scan(
517 &meta.EdgeJob,
518 &meta.Key,
519 &meta.Value,
520 )
521 if err != nil {
522 return []model.EdgeJobMetadata{}, err
523 }
524 metas = append(metas, meta)
525 }
526
527 return metas, nil
528 }
529
530
531 func (db *DBHandle) GetEdgeJobTests(edgeJobID uuid.UUID) ([]model.EdgeJobTest, error) {
532 queryTests := `
533 SELECT
534 name, suite, time, failed, failure_text
535 FROM test_infra.edge_job_tests
536 WHERE edge_job=$1;
537 `
538
539 var tests []model.EdgeJobTest
540
541 rows, err := db.Query(queryTests, edgeJobID)
542 if err != nil {
543 return tests, err
544 }
545
546 for rows.Next() {
547 var test model.EdgeJobTest
548 err := rows.Scan(
549 &test.Name,
550 &test.Suite,
551 &test.Time,
552 &test.Failed,
553 &test.FailureText,
554 )
555 if err != nil {
556 return tests, err
557 }
558 test.TimeSeconds = test.Time.Seconds()
559
560 tests = append(tests, test)
561 }
562 return tests, nil
563 }
564
View as plain text