...

Source file src/edge-infra.dev/pkg/f8n/devinfra/testinfra/sql/sql.go

Documentation: edge-infra.dev/pkg/f8n/devinfra/testinfra/sql

     1  package sql
     2  
     3  import (
     4  	"context"
     5  	"database/sql"
     6  	"fmt"
     7  	"net/url"
     8  	"os"
     9  	"strconv"
    10  	"strings"
    11  	"time"
    12  
    13  	"github.com/google/uuid"
    14  	_ "github.com/jackc/pgx/v4/stdlib" // nolint:revive necessary for db driver
    15  
    16  	"edge-infra.dev/pkg/f8n/devinfra/testinfra/model"
    17  	"edge-infra.dev/pkg/f8n/devinfra/testinfra/sql/schema"
    18  )
    19  
    20  var ErrDuplicate = fmt.Errorf("Value already exists in table")
    21  
    22  type DBHandle struct {
    23  	*sql.DB
    24  }
    25  
    26  const (
    27  	postgres      = "postgres"
    28  	gcpBucketLink = "https://console.cloud.google.com/storage/browser/edge-test-jobs/"
    29  )
    30  
    31  func FromEnv() (*DBHandle, error) {
    32  	user, ok := os.LookupEnv("DB_USER")
    33  	if !ok {
    34  		user = postgres
    35  	}
    36  	password, ok := os.LookupEnv("DB_PASS")
    37  	if !ok {
    38  		password = ""
    39  	}
    40  	host, ok := os.LookupEnv("DB_HOST")
    41  	if !ok {
    42  		host = "127.0.0.1"
    43  	}
    44  	port, ok := os.LookupEnv("DB_PORT")
    45  	if !ok {
    46  		port = "5432"
    47  	}
    48  	dbName, ok := os.LookupEnv("DB_NAME")
    49  	if !ok {
    50  		dbName = postgres
    51  	}
    52  
    53  	var dbMaxConns int
    54  	dbMaxConnsStr, ok := os.LookupEnv("DB_MAX_CONNS")
    55  	if ok {
    56  		dbMaxConnsParsed, err := strconv.Atoi(dbMaxConnsStr)
    57  		if err != nil {
    58  			return nil, fmt.Errorf("failed to parse DB_MAX_CONNS: %w", err)
    59  		}
    60  		dbMaxConns = dbMaxConnsParsed
    61  	} else {
    62  		// AlloyDB assumed to have default config of 1000 max connections with reservation for superuser connections.
    63  		// 450 represents half of the remaining 900 user connections, allowing a second replica for rollouts, failovers.
    64  		dbMaxConns = 450
    65  	}
    66  
    67  	dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s", host, port, user, password, dbName)
    68  	db, err := sql.Open("pgx", dsn)
    69  	if err != nil {
    70  		return nil, fmt.Errorf("failed to open db with pgx driver: %w", err)
    71  	}
    72  
    73  	err = db.Ping()
    74  	if err != nil {
    75  		return nil, fmt.Errorf("failed to ping database: %w", err)
    76  	}
    77  
    78  	err = execSchema(db)
    79  	if err != nil {
    80  		return nil, fmt.Errorf("failed to execute schema: %w", err)
    81  	}
    82  
    83  	// TODO(dk185217): Look into how db/sql or pgx driver handles sharing connections. Is setting a client side max
    84  	// less than the server max sufficient?
    85  	db.SetMaxOpenConns(dbMaxConns)
    86  	db.SetMaxIdleConns(dbMaxConns)
    87  	db.SetConnMaxIdleTime(time.Minute)
    88  
    89  	return &DBHandle{DB: db}, nil
    90  }
    91  
    92  func execSchema(db *sql.DB) error {
    93  	schemaSQL, err := schema.SQL.ReadFile("schema.sql")
    94  	if err != nil {
    95  		return err
    96  	}
    97  	_, err = db.Exec(string(schemaSQL))
    98  	if err != nil {
    99  		return err
   100  	}
   101  	return nil
   102  }
   103  
   104  func (db *DBHandle) InsertEdgeJob(ctx context.Context, job model.EdgeJob) (uuid.UUID, error) {
   105  	// check if the job.number already exists in the database
   106  	var exists bool
   107  	queryJobs := `
   108  	SELECT
   109    EXISTS(
   110      SELECT
   111        1
   112      FROM
   113        test_infra.edge_jobs
   114      WHERE
   115        number=$1
   116    );
   117  	`
   118  
   119  	row := db.QueryRowContext(ctx, queryJobs, job.Number)
   120  	err := row.Scan(
   121  		&exists,
   122  	)
   123  	if err != nil {
   124  		return uuid.Nil, err
   125  	}
   126  
   127  	if exists {
   128  		return uuid.Nil, ErrDuplicate
   129  	}
   130  
   131  	query := `
   132  INSERT INTO test_infra.edge_jobs(elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit)
   133  VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
   134  RETURNING id;
   135  `
   136  
   137  	var id uuid.UUID
   138  	row = db.QueryRowContext(ctx, query,
   139  		job.Elapsed,
   140  		job.Started,
   141  		job.Finished,
   142  		job.Version,
   143  		job.Path,
   144  		job.Job,
   145  		job.Workflow,
   146  		job.Number,
   147  		job.TestsRun,
   148  		job.TestsFailed,
   149  		job.Passed,
   150  		job.Repos,
   151  		job.RepoCommit)
   152  	err = row.Scan(&id)
   153  	if err != nil {
   154  		return uuid.Nil, err
   155  	}
   156  
   157  	return id, nil
   158  }
   159  
   160  func (db *DBHandle) InsertEdgeJobMetadata(ctx context.Context, meta model.EdgeJobMetadata) (uuid.UUID, error) {
   161  	query := `
   162  INSERT INTO test_infra.edge_job_metadatas(edge_job, key, value)
   163  VALUES ($1, $2, $3)
   164  RETURNING id;
   165  `
   166  	var id uuid.UUID
   167  	row := db.QueryRowContext(ctx, query, meta.EdgeJob, meta.Key, meta.Value)
   168  	err := row.Scan(&id)
   169  	if err != nil {
   170  		return uuid.Nil, err
   171  	}
   172  
   173  	return id, nil
   174  }
   175  
   176  func (db *DBHandle) InsertEdgeJobTest(ctx context.Context, test model.EdgeJobTest) (uuid.UUID, error) {
   177  	query := `
   178  INSERT INTO test_infra.edge_job_tests(edge_job,name,suite,time,failed,failure_text)
   179  VALUES ($1, $2, $3, $4, $5, $6)
   180  RETURNING id;
   181  `
   182  	var id uuid.UUID
   183  	row := db.QueryRowContext(ctx, query, test.EdgeJob, test.Name, test.Suite, test.Time, test.Failed, test.FailureText)
   184  	err := row.Scan(&id)
   185  	if err != nil {
   186  		return uuid.Nil, err
   187  	}
   188  
   189  	return id, nil
   190  }
   191  
   192  // GetRecentEdgeJobs retrieves all jobs added to the database in the last 24 hours
   193  func (db *DBHandle) GetTodaysEdgeJobs() ([]model.EdgeJob, error) {
   194  	query := `SELECT elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit
   195  FROM test_infra.edge_jobs AS jobs
   196  WHERE jobs.started BETWEEN NOW() - INTERVAL '24 HOURS' AND NOW()
   197  ORDER BY jobs.started DESC`
   198  
   199  	var tests []model.EdgeJob
   200  
   201  	rows, err := db.Query(query)
   202  	if err != nil {
   203  		return nil, err
   204  	}
   205  
   206  	for rows.Next() {
   207  		var test model.EdgeJob
   208  		err := rows.Scan(
   209  			&test.Elapsed,
   210  			&test.Started,
   211  			&test.Finished,
   212  			&test.Version,
   213  			&test.Path,
   214  			&test.Job,
   215  			&test.Workflow,
   216  			&test.Number,
   217  			&test.TestsRun,
   218  			&test.TestsFailed,
   219  			&test.Passed,
   220  			&test.Repos,
   221  			&test.RepoCommit,
   222  		)
   223  		if err != nil {
   224  			return nil, err
   225  		}
   226  
   227  		test.ElapsedSeconds = test.Elapsed.Seconds()
   228  		tests = append(tests, test)
   229  	}
   230  
   231  	// if there are no jobs in the last 24 hours just get the latest 100 results
   232  	if len(tests) == 0 {
   233  		fmt.Println("no jobs found in last 24 hours, getting last 100")
   234  		return db.GetRecentEdgeJobs(nil)
   235  	}
   236  
   237  	return tests, rows.Err()
   238  }
   239  
   240  // GetRecentEdgeJobs retrieves all jobs added to the database in the last 24 hours
   241  // jobs are filtered down to workflows that typically run tests
   242  // optional filters with URL query params can also be passed and added to the query
   243  func (db *DBHandle) GetRecentEdgeJobs(params url.Values) ([]model.EdgeJob, error) {
   244  	// metaKey, metaValue, metaJob, and id are throwaway vars
   245  	// as they are used for the query itself, but not needed in the resulting []EdgeJob
   246  	var metaKey, metaValue, metaJob, id, filter string
   247  
   248  	// if metadata.key or metadata.value are not included in the filters,
   249  	// filter on the "workflow_name" key to remove extra entries caused by other
   250  	// metadata key-value pairs
   251  	if !params.Has("key") && !params.Has("value") {
   252  		filter = "AND key='workflow_name'\n"
   253  	}
   254  
   255  	for k := range params {
   256  		v := fmt.Sprint("'", strings.Join(params[k], "', '"), "'")
   257  		filter = filter + fmt.Sprintf("AND %s IN (%s)\n", k, v)
   258  	}
   259  
   260  	query := fmt.Sprintf(`SELECT test_infra.edge_jobs.elapsed as elapsed, test_infra.edge_jobs.started as started, test_infra.edge_jobs.finished as finished, test_infra.edge_jobs.version as version, test_infra.edge_jobs.path as path, test_infra.edge_jobs.job as job, test_infra.edge_jobs.workflow as workflow, test_infra.edge_jobs.number as number, test_infra.edge_jobs.tests_run as tests_run, test_infra.edge_jobs.tests_failed as tests_failed, test_infra.edge_jobs.passed as passed, test_infra.edge_jobs.repos as repos, test_infra.edge_jobs.repo_commit as repo_commit, test_infra.edge_job_metadatas.key as key, test_infra.edge_job_metadatas.value as value, test_infra.edge_job_metadatas.edge_job, test_infra.edge_jobs.id
   261  FROM test_infra.edge_jobs
   262  INNER JOIN test_infra.edge_job_metadatas 
   263  ON test_infra.edge_jobs.id=test_infra.edge_job_metadatas.edge_job
   264  WHERE test_infra.edge_jobs.workflow IN ('dsds-ci-nightly','hourly-slow-l2-test','hourly-stable-l2-test','hourly-serial-l2-test','hack-verify','iam-e2e','presubmit','presubmit-argo','postsubmit')
   265  %s
   266  ORDER BY test_infra.edge_jobs.started DESC LIMIT 100`, filter)
   267  
   268  	var tests []model.EdgeJob
   269  
   270  	rows, err := db.Query(query)
   271  	if err != nil {
   272  		return nil, err
   273  	}
   274  
   275  	for rows.Next() {
   276  		var test model.EdgeJob
   277  		err := rows.Scan(
   278  			&test.Elapsed,
   279  			&test.Started,
   280  			&test.Finished,
   281  			&test.Version,
   282  			&test.Path,
   283  			&test.Job,
   284  			&test.Workflow,
   285  			&test.Number,
   286  			&test.TestsRun,
   287  			&test.TestsFailed,
   288  			&test.Passed,
   289  			&test.Repos,
   290  			&test.RepoCommit,
   291  			&metaKey,
   292  			&metaValue,
   293  			&metaJob,
   294  			&id,
   295  		)
   296  		if err != nil {
   297  			return nil, err
   298  		}
   299  
   300  		tagline := fmt.Sprint(test.Repos, "/", test.Job)
   301  		if test.Job == "" {
   302  			test.Job = "argo"
   303  			tagline = fmt.Sprint(test.Job, "/", test.Workflow)
   304  		}
   305  		test.URLTagline = tagline
   306  
   307  		// generate the URL
   308  		test.URL = fmt.Sprint("/", test.Repos, "/", test.Workflow, "/", test.Job, "/", test.Number)
   309  		test.ElapsedSeconds = test.Elapsed.Seconds()
   310  		tests = append(tests, test)
   311  	}
   312  
   313  	return tests, rows.Err()
   314  }
   315  
   316  // GetRecentEdgeJobsByPlatform retrieves all jobs in the databases whose metadata has a value for the clusterVersion key
   317  // and maps the most recent test for each workflow-platform pair.
   318  func (db *DBHandle) GetRecentEdgeJobsByPlatform() (map[string]map[string]model.EdgeJob, error) {
   319  	query := `select test_infra.edge_jobs.started, test_infra.edge_jobs.number, test_infra.edge_jobs.job, test_infra.edge_jobs.repos, test_infra.edge_jobs.workflow, test_infra.edge_jobs.passed, test_infra.edge_jobs.tests_run, test_infra.edge_jobs.tests_failed, test_infra.edge_job_metadatas.value as platform, test_infra.edge_job_metadatas.key, test_infra.edge_job_metadatas.edge_job, test_infra.edge_jobs.id
   320  from test_infra.edge_jobs 
   321  INNER JOIN test_infra.edge_job_metadatas 
   322  ON test_infra.edge_jobs.id=test_infra.edge_job_metadatas.edge_job
   323  where test_infra.edge_job_metadatas.key = 'clusterVersion'
   324  ORDER BY test_infra.edge_jobs.started DESC
   325  LIMIT 100;`
   326  
   327  	// variables for storing columns necessary for the query, but not for the EdgeJob itself
   328  	var id, k, edgeJob string
   329  
   330  	// Each workflow will be the key to a map of EdgeJobs whose key is the platform
   331  	// e.g. tests["hourly-serial-l2-test"]["gke-1.27.14"] = EdgeJob with those values
   332  	tests := make(map[string]map[string]model.EdgeJob)
   333  
   334  	rows, err := db.Query(query)
   335  	if err != nil {
   336  		return nil, err
   337  	}
   338  
   339  	for rows.Next() {
   340  		var test model.EdgeJob
   341  		err := rows.Scan(
   342  			&test.Started,
   343  			&test.Number,
   344  			&test.Job,
   345  			&test.Repos,
   346  			&test.Workflow,
   347  			&test.Passed,
   348  			&test.TestsRun,
   349  			&test.TestsFailed,
   350  			&test.Platform,
   351  			&id,
   352  			&k,
   353  			&edgeJob,
   354  		)
   355  		if err != nil {
   356  			return nil, err
   357  		}
   358  
   359  		// generate the URL
   360  		if test.Job == "" {
   361  			test.Job = "argo"
   362  		}
   363  		test.URL = fmt.Sprint("/", test.Repos, "/", test.Workflow, "/", test.Job, "/", test.Number)
   364  
   365  		// Parse test.Platform into <platformType>-<platformVersion>, e.g. gke-1.29.6
   366  		if test.Platform != "" {
   367  			platformInfo := strings.Split(test.Platform, "-")
   368  			platformVersion := platformInfo[0]
   369  			platformType := strings.Split(platformInfo[1], ".")
   370  			platform := fmt.Sprintf("%s-%s", platformType[0], platformVersion)
   371  			test.Platform = platform
   372  
   373  			// Create map for workflow
   374  			if _, ok := tests[test.Workflow]; !ok {
   375  				tests[test.Workflow] = make(map[string]model.EdgeJob)
   376  			}
   377  
   378  			// Only add the test if that workflow_platform pair isn't in the map yet
   379  			if _, ok := tests[test.Workflow][platform]; !ok {
   380  				tests[test.Workflow][platform] = test
   381  			}
   382  		}
   383  	}
   384  
   385  	return tests, rows.Err()
   386  }
   387  
   388  // GetRecentEdgeJobs retrieves the latest 100 runs for a given job / repo
   389  func (db *DBHandle) GetRecentEdgeJobRuns(repo, workflow, job string) ([]model.EdgeJob, error) {
   390  	query := `SELECT elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit
   391  FROM test_infra.edge_jobs AS jobs
   392  WHERE repos=$1 and job=$2 and workflow=$3
   393  ORDER BY jobs.started DESC
   394  LIMIT 100`
   395  
   396  	var tests []model.EdgeJob
   397  
   398  	rows, err := db.Query(query, repo, job, workflow)
   399  	if err != nil {
   400  		return nil, err
   401  	}
   402  
   403  	for rows.Next() {
   404  		var test model.EdgeJob
   405  		err := rows.Scan(
   406  			&test.Elapsed,
   407  			&test.Started,
   408  			&test.Finished,
   409  			&test.Version,
   410  			&test.Path,
   411  			&test.Job,
   412  			&test.Workflow,
   413  			&test.Number,
   414  			&test.TestsRun,
   415  			&test.TestsFailed,
   416  			&test.Passed,
   417  			&test.Repos,
   418  			&test.RepoCommit,
   419  		)
   420  		if err != nil {
   421  			return nil, err
   422  		}
   423  
   424  		test.ElapsedSeconds = test.Elapsed.Seconds()
   425  
   426  		tests = append(tests, test)
   427  	}
   428  
   429  	return tests, rows.Err()
   430  }
   431  
   432  // GetEdgeJob retrieves a specific job with its metadata and test data
   433  // TODO(ss185994): inner join or cte to gather all the data would be better than 3 separate selects
   434  func (db *DBHandle) GetEdgeJob(ctx context.Context, number string) (model.EdgeRun, error) {
   435  	// query := `
   436  	// SELECT
   437  	//     *
   438  	// FROM
   439  	// 	test_infra.edge_jobs jobs
   440  	// INNER JOIN test_infra.edge_job_metadatas meta
   441  	//     ON meta.edge_job = jobs.id
   442  	// INNER JOIN test_infra.edge_job_tests tests
   443  	//     ON tests.edge_job = jobs.id
   444  	// WHERE number = $1;
   445  	// `
   446  
   447  	var run model.EdgeRun
   448  
   449  	queryJobs := `
   450  	SELECT
   451  	id, elapsed, started, finished, version, path, job, workflow, number, tests_run, tests_failed, passed, repos, repo_commit
   452  	FROM test_infra.edge_jobs
   453  	where number=$1;
   454  	`
   455  
   456  	row := db.QueryRowContext(ctx, queryJobs, number)
   457  	err := row.Scan(
   458  		&run.EdgeJobID,
   459  		&run.Job.Elapsed,
   460  		&run.Job.Started,
   461  		&run.Job.Finished,
   462  		&run.Job.Version,
   463  		&run.Job.Path,
   464  		&run.Job.Job,
   465  		&run.Job.Workflow,
   466  		&run.Job.Number,
   467  		&run.Job.TestsRun,
   468  		&run.Job.TestsFailed,
   469  		&run.Job.Passed,
   470  		&run.Job.Repos,
   471  		&run.Job.RepoCommit,
   472  	)
   473  	if err != nil {
   474  		return model.EdgeRun{}, err
   475  	}
   476  
   477  	gcpURL := fmt.Sprint(gcpBucketLink, "actions/", run.Job.Repos, "/", run.Job.Workflow, "/", run.Job.Number, "/", run.Job.Job)
   478  	if run.Job.Job == "" {
   479  		gcpURL = fmt.Sprint(gcpBucketLink, "argo/", run.Job.Repos, "/", run.Job.Workflow, "/", run.Job.Number)
   480  	}
   481  	run.Job.GCPURL = gcpURL
   482  
   483  	run.Job.ElapsedSeconds = run.Job.Elapsed.Seconds()
   484  
   485  	run.Metadata, err = db.GetEdgeJobMetadata(run.EdgeJobID)
   486  	if err != nil {
   487  		fmt.Println(err)
   488  	}
   489  
   490  	run.Tests, err = db.GetEdgeJobTests(run.EdgeJobID)
   491  	if err != nil {
   492  		fmt.Println(err)
   493  	}
   494  
   495  	return run, nil
   496  }
   497  
   498  // GetEdgeJobMetadata retrieves metadata for a specific jobID
   499  func (db *DBHandle) GetEdgeJobMetadata(edgeJobID uuid.UUID) ([]model.EdgeJobMetadata, error) {
   500  	queryMetas := `
   501  	SELECT
   502  	edge_job, key, value
   503  	FROM test_infra.edge_job_metadatas
   504  	WHERE edge_job=$1;
   505  	`
   506  
   507  	metas := []model.EdgeJobMetadata{}
   508  
   509  	rows, err := db.Query(queryMetas, edgeJobID.String())
   510  	if err != nil {
   511  		return []model.EdgeJobMetadata{}, err
   512  	}
   513  	for rows.Next() {
   514  		var meta model.EdgeJobMetadata
   515  
   516  		err := rows.Scan(
   517  			&meta.EdgeJob,
   518  			&meta.Key,
   519  			&meta.Value,
   520  		)
   521  		if err != nil {
   522  			return []model.EdgeJobMetadata{}, err
   523  		}
   524  		metas = append(metas, meta)
   525  	}
   526  
   527  	return metas, nil
   528  }
   529  
   530  // GetRecentEdgeJobs retrieves all tests associated with a given jobID
   531  func (db *DBHandle) GetEdgeJobTests(edgeJobID uuid.UUID) ([]model.EdgeJobTest, error) {
   532  	queryTests := `
   533  	SELECT
   534  	name, suite, time, failed, failure_text
   535  	FROM test_infra.edge_job_tests
   536  	WHERE edge_job=$1;
   537  	`
   538  
   539  	var tests []model.EdgeJobTest
   540  
   541  	rows, err := db.Query(queryTests, edgeJobID)
   542  	if err != nil {
   543  		return tests, err
   544  	}
   545  
   546  	for rows.Next() {
   547  		var test model.EdgeJobTest
   548  		err := rows.Scan(
   549  			&test.Name,
   550  			&test.Suite,
   551  			&test.Time,
   552  			&test.Failed,
   553  			&test.FailureText,
   554  		)
   555  		if err != nil {
   556  			return tests, err
   557  		}
   558  		test.TimeSeconds = test.Time.Seconds()
   559  
   560  		tests = append(tests, test)
   561  	}
   562  	return tests, nil
   563  }
   564  

View as plain text