package sql import ( "context" "database/sql" "fmt" "net/url" "os" "strconv" "strings" "time" "github.com/google/uuid" _ "github.com/jackc/pgx/v4/stdlib" // nolint:revive necessary for db driver "edge-infra.dev/pkg/f8n/devinfra/testinfra/model" "edge-infra.dev/pkg/f8n/devinfra/testinfra/sql/schema" ) var ErrDuplicate = fmt.Errorf("Value already exists in table") type DBHandle struct { *sql.DB } const ( postgres = "postgres" gcpBucketLink = "https://console.cloud.google.com/storage/browser/edge-test-jobs/" ) func FromEnv() (*DBHandle, error) { user, ok := os.LookupEnv("DB_USER") if !ok { user = postgres } password, ok := os.LookupEnv("DB_PASS") if !ok { password = "" } host, ok := os.LookupEnv("DB_HOST") if !ok { host = "127.0.0.1" } port, ok := os.LookupEnv("DB_PORT") if !ok { port = "5432" } dbName, ok := os.LookupEnv("DB_NAME") if !ok { dbName = postgres } var dbMaxConns int dbMaxConnsStr, ok := os.LookupEnv("DB_MAX_CONNS") if ok { dbMaxConnsParsed, err := strconv.Atoi(dbMaxConnsStr) if err != nil { return nil, fmt.Errorf("failed to parse DB_MAX_CONNS: %w", err) } dbMaxConns = dbMaxConnsParsed } else { // AlloyDB assumed to have default config of 1000 max connections with reservation for superuser connections. // 450 represents half of the remaining 900 user connections, allowing a second replica for rollouts, failovers. dbMaxConns = 450 } dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s", host, port, user, password, dbName) db, err := sql.Open("pgx", dsn) if err != nil { return nil, fmt.Errorf("failed to open db with pgx driver: %w", err) } err = db.Ping() if err != nil { return nil, fmt.Errorf("failed to ping database: %w", err) } err = execSchema(db) if err != nil { return nil, fmt.Errorf("failed to execute schema: %w", err) } // TODO(dk185217): Look into how db/sql or pgx driver handles sharing connections. Is setting a client side max // less than the server max sufficient? db.SetMaxOpenConns(dbMaxConns) db.SetMaxIdleConns(dbMaxConns) db.SetConnMaxIdleTime(time.Minute) return &DBHandle{DB: db}, nil } func execSchema(db *sql.DB) error { schemaSQL, err := schema.SQL.ReadFile("schema.sql") if err != nil { return err } _, err = db.Exec(string(schemaSQL)) if err != nil { return err } return nil } func (db *DBHandle) InsertEdgeJob(ctx context.Context, job model.EdgeJob) (uuid.UUID, error) { // check if the job.number already exists in the database var exists bool queryJobs := ` SELECT EXISTS( SELECT 1 FROM test_infra.edge_jobs WHERE number=$1 ); ` row := db.QueryRowContext(ctx, queryJobs, job.Number) err := row.Scan( &exists, ) if err != nil { return uuid.Nil, err } if exists { return uuid.Nil, ErrDuplicate } query := ` INSERT INTO test_infra.edge_jobs(elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) RETURNING id; ` var id uuid.UUID row = db.QueryRowContext(ctx, query, job.Elapsed, job.Started, job.Finished, job.Version, job.Path, job.Job, job.Workflow, job.Number, job.TestsRun, job.TestsFailed, job.Passed, job.Repos, job.RepoCommit) err = row.Scan(&id) if err != nil { return uuid.Nil, err } return id, nil } func (db *DBHandle) InsertEdgeJobMetadata(ctx context.Context, meta model.EdgeJobMetadata) (uuid.UUID, error) { query := ` INSERT INTO test_infra.edge_job_metadatas(edge_job, key, value) VALUES ($1, $2, $3) RETURNING id; ` var id uuid.UUID row := db.QueryRowContext(ctx, query, meta.EdgeJob, meta.Key, meta.Value) err := row.Scan(&id) if err != nil { return uuid.Nil, err } return id, nil } func (db *DBHandle) InsertEdgeJobTest(ctx context.Context, test model.EdgeJobTest) (uuid.UUID, error) { query := ` INSERT INTO test_infra.edge_job_tests(edge_job,name,suite,time,failed,failure_text) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id; ` var id uuid.UUID row := db.QueryRowContext(ctx, query, test.EdgeJob, test.Name, test.Suite, test.Time, test.Failed, test.FailureText) err := row.Scan(&id) if err != nil { return uuid.Nil, err } return id, nil } // GetRecentEdgeJobs retrieves all jobs added to the database in the last 24 hours func (db *DBHandle) GetTodaysEdgeJobs() ([]model.EdgeJob, error) { query := `SELECT elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit FROM test_infra.edge_jobs AS jobs WHERE jobs.started BETWEEN NOW() - INTERVAL '24 HOURS' AND NOW() ORDER BY jobs.started DESC` var tests []model.EdgeJob rows, err := db.Query(query) if err != nil { return nil, err } for rows.Next() { var test model.EdgeJob err := rows.Scan( &test.Elapsed, &test.Started, &test.Finished, &test.Version, &test.Path, &test.Job, &test.Workflow, &test.Number, &test.TestsRun, &test.TestsFailed, &test.Passed, &test.Repos, &test.RepoCommit, ) if err != nil { return nil, err } test.ElapsedSeconds = test.Elapsed.Seconds() tests = append(tests, test) } // if there are no jobs in the last 24 hours just get the latest 100 results if len(tests) == 0 { fmt.Println("no jobs found in last 24 hours, getting last 100") return db.GetRecentEdgeJobs(nil) } return tests, rows.Err() } // GetRecentEdgeJobs retrieves all jobs added to the database in the last 24 hours // jobs are filtered down to workflows that typically run tests // optional filters with URL query params can also be passed and added to the query func (db *DBHandle) GetRecentEdgeJobs(params url.Values) ([]model.EdgeJob, error) { // metaKey, metaValue, metaJob, and id are throwaway vars // as they are used for the query itself, but not needed in the resulting []EdgeJob var metaKey, metaValue, metaJob, id, filter string // if metadata.key or metadata.value are not included in the filters, // filter on the "workflow_name" key to remove extra entries caused by other // metadata key-value pairs if !params.Has("key") && !params.Has("value") { filter = "AND key='workflow_name'\n" } for k := range params { v := fmt.Sprint("'", strings.Join(params[k], "', '"), "'") filter = filter + fmt.Sprintf("AND %s IN (%s)\n", k, v) } query := fmt.Sprintf(`SELECT test_infra.edge_jobs.elapsed as elapsed, test_infra.edge_jobs.started as started, test_infra.edge_jobs.finished as finished, test_infra.edge_jobs.version as version, test_infra.edge_jobs.path as path, test_infra.edge_jobs.job as job, test_infra.edge_jobs.workflow as workflow, test_infra.edge_jobs.number as number, test_infra.edge_jobs.tests_run as tests_run, test_infra.edge_jobs.tests_failed as tests_failed, test_infra.edge_jobs.passed as passed, test_infra.edge_jobs.repos as repos, test_infra.edge_jobs.repo_commit as repo_commit, test_infra.edge_job_metadatas.key as key, test_infra.edge_job_metadatas.value as value, test_infra.edge_job_metadatas.edge_job, test_infra.edge_jobs.id FROM test_infra.edge_jobs INNER JOIN test_infra.edge_job_metadatas ON test_infra.edge_jobs.id=test_infra.edge_job_metadatas.edge_job WHERE test_infra.edge_jobs.workflow IN ('dsds-ci-nightly','hourly-slow-l2-test','hourly-stable-l2-test','hourly-serial-l2-test','hack-verify','iam-e2e','presubmit','presubmit-argo','postsubmit') %s ORDER BY test_infra.edge_jobs.started DESC LIMIT 100`, filter) var tests []model.EdgeJob rows, err := db.Query(query) if err != nil { return nil, err } for rows.Next() { var test model.EdgeJob err := rows.Scan( &test.Elapsed, &test.Started, &test.Finished, &test.Version, &test.Path, &test.Job, &test.Workflow, &test.Number, &test.TestsRun, &test.TestsFailed, &test.Passed, &test.Repos, &test.RepoCommit, &metaKey, &metaValue, &metaJob, &id, ) if err != nil { return nil, err } tagline := fmt.Sprint(test.Repos, "/", test.Job) if test.Job == "" { test.Job = "argo" tagline = fmt.Sprint(test.Job, "/", test.Workflow) } test.URLTagline = tagline // generate the URL test.URL = fmt.Sprint("/", test.Repos, "/", test.Workflow, "/", test.Job, "/", test.Number) test.ElapsedSeconds = test.Elapsed.Seconds() tests = append(tests, test) } return tests, rows.Err() } // GetRecentEdgeJobsByPlatform retrieves all jobs in the databases whose metadata has a value for the clusterVersion key // and maps the most recent test for each workflow-platform pair. func (db *DBHandle) GetRecentEdgeJobsByPlatform() (map[string]map[string]model.EdgeJob, error) { query := `select test_infra.edge_jobs.started, test_infra.edge_jobs.number, test_infra.edge_jobs.job, test_infra.edge_jobs.repos, test_infra.edge_jobs.workflow, test_infra.edge_jobs.passed, test_infra.edge_jobs.tests_run, test_infra.edge_jobs.tests_failed, test_infra.edge_job_metadatas.value as platform, test_infra.edge_job_metadatas.key, test_infra.edge_job_metadatas.edge_job, test_infra.edge_jobs.id from test_infra.edge_jobs INNER JOIN test_infra.edge_job_metadatas ON test_infra.edge_jobs.id=test_infra.edge_job_metadatas.edge_job where test_infra.edge_job_metadatas.key = 'clusterVersion' ORDER BY test_infra.edge_jobs.started DESC LIMIT 100;` // variables for storing columns necessary for the query, but not for the EdgeJob itself var id, k, edgeJob string // Each workflow will be the key to a map of EdgeJobs whose key is the platform // e.g. tests["hourly-serial-l2-test"]["gke-1.27.14"] = EdgeJob with those values tests := make(map[string]map[string]model.EdgeJob) rows, err := db.Query(query) if err != nil { return nil, err } for rows.Next() { var test model.EdgeJob err := rows.Scan( &test.Started, &test.Number, &test.Job, &test.Repos, &test.Workflow, &test.Passed, &test.TestsRun, &test.TestsFailed, &test.Platform, &id, &k, &edgeJob, ) if err != nil { return nil, err } // generate the URL if test.Job == "" { test.Job = "argo" } test.URL = fmt.Sprint("/", test.Repos, "/", test.Workflow, "/", test.Job, "/", test.Number) // Parse test.Platform into -, e.g. gke-1.29.6 if test.Platform != "" { platformInfo := strings.Split(test.Platform, "-") platformVersion := platformInfo[0] platformType := strings.Split(platformInfo[1], ".") platform := fmt.Sprintf("%s-%s", platformType[0], platformVersion) test.Platform = platform // Create map for workflow if _, ok := tests[test.Workflow]; !ok { tests[test.Workflow] = make(map[string]model.EdgeJob) } // Only add the test if that workflow_platform pair isn't in the map yet if _, ok := tests[test.Workflow][platform]; !ok { tests[test.Workflow][platform] = test } } } return tests, rows.Err() } // GetRecentEdgeJobs retrieves the latest 100 runs for a given job / repo func (db *DBHandle) GetRecentEdgeJobRuns(repo, workflow, job string) ([]model.EdgeJob, error) { query := `SELECT elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit FROM test_infra.edge_jobs AS jobs WHERE repos=$1 and job=$2 and workflow=$3 ORDER BY jobs.started DESC LIMIT 100` var tests []model.EdgeJob rows, err := db.Query(query, repo, job, workflow) if err != nil { return nil, err } for rows.Next() { var test model.EdgeJob err := rows.Scan( &test.Elapsed, &test.Started, &test.Finished, &test.Version, &test.Path, &test.Job, &test.Workflow, &test.Number, &test.TestsRun, &test.TestsFailed, &test.Passed, &test.Repos, &test.RepoCommit, ) if err != nil { return nil, err } test.ElapsedSeconds = test.Elapsed.Seconds() tests = append(tests, test) } return tests, rows.Err() } // GetEdgeJob retrieves a specific job with its metadata and test data // TODO(ss185994): inner join or cte to gather all the data would be better than 3 separate selects func (db *DBHandle) GetEdgeJob(ctx context.Context, number string) (model.EdgeRun, error) { // query := ` // SELECT // * // FROM // test_infra.edge_jobs jobs // INNER JOIN test_infra.edge_job_metadatas meta // ON meta.edge_job = jobs.id // INNER JOIN test_infra.edge_job_tests tests // ON tests.edge_job = jobs.id // WHERE number = $1; // ` var run model.EdgeRun queryJobs := ` SELECT id, elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit FROM test_infra.edge_jobs where number=$1; ` row := db.QueryRowContext(ctx, queryJobs, number) err := row.Scan( &run.EdgeJobID, &run.Job.Elapsed, &run.Job.Started, &run.Job.Finished, &run.Job.Version, &run.Job.Path, &run.Job.Job, &run.Job.Workflow, &run.Job.Number, &run.Job.TestsRun, &run.Job.TestsFailed, &run.Job.Passed, &run.Job.Repos, &run.Job.RepoCommit, ) if err != nil { return model.EdgeRun{}, err } gcpURL := fmt.Sprint(gcpBucketLink, "actions/", run.Job.Repos, "/", run.Job.Workflow, "/", run.Job.Number, "/", run.Job.Job) if run.Job.Job == "" { gcpURL = fmt.Sprint(gcpBucketLink, "argo/", run.Job.Repos, "/", run.Job.Workflow, "/", run.Job.Number) } run.Job.GCPURL = gcpURL run.Job.ElapsedSeconds = run.Job.Elapsed.Seconds() run.Metadata, err = db.GetEdgeJobMetadata(run.EdgeJobID) if err != nil { fmt.Println(err) } run.Tests, err = db.GetEdgeJobTests(run.EdgeJobID) if err != nil { fmt.Println(err) } return run, nil } // GetEdgeJobMetadata retrieves metadata for a specific jobID func (db *DBHandle) GetEdgeJobMetadata(edgeJobID uuid.UUID) ([]model.EdgeJobMetadata, error) { queryMetas := ` SELECT edge_job, key, value FROM test_infra.edge_job_metadatas WHERE edge_job=$1; ` metas := []model.EdgeJobMetadata{} rows, err := db.Query(queryMetas, edgeJobID.String()) if err != nil { return []model.EdgeJobMetadata{}, err } for rows.Next() { var meta model.EdgeJobMetadata err := rows.Scan( &meta.EdgeJob, &meta.Key, &meta.Value, ) if err != nil { return []model.EdgeJobMetadata{}, err } metas = append(metas, meta) } return metas, nil } // GetRecentEdgeJobs retrieves all tests associated with a given jobID func (db *DBHandle) GetEdgeJobTests(edgeJobID uuid.UUID) ([]model.EdgeJobTest, error) { queryTests := ` SELECT name, suite, time, failed, failure_text FROM test_infra.edge_job_tests WHERE edge_job=$1; ` var tests []model.EdgeJobTest rows, err := db.Query(queryTests, edgeJobID) if err != nil { return tests, err } for rows.Next() { var test model.EdgeJobTest err := rows.Scan( &test.Name, &test.Suite, &test.Time, &test.Failed, &test.FailureText, ) if err != nil { return tests, err } test.TimeSeconds = test.Time.Seconds() tests = append(tests, test) } return tests, nil }