package sql import ( "context" "database/sql" "fmt" "net" "path" "testing" "time" "github.com/bazelbuild/rules_go/go/runfiles" pgsql "github.com/fergusstrange/embedded-postgres" "github.com/google/uuid" _ "github.com/jackc/pgx/v4/stdlib" // nolint:revive necessary for db driver "edge-infra.dev/pkg/f8n/kinform/model" sovereign "edge-infra.dev/pkg/f8n/sovereign/model" "edge-infra.dev/pkg/lib/build/bazel" "edge-infra.dev/pkg/lib/compression" ) var ( skipAll = false pgDSN string ) func TestMain(m *testing.M) { embeddedTxzFile, err := runfiles.Rlocation(path.Join("edge_infra", "hack", "tools", "postgres.txz")) if err != nil { panic(err) } tempDir, err := bazel.NewTestTmpDir("edge-infra-kinform-sql-test-*") if err != nil { panic(err) } pgRuntimePath := path.Join(tempDir, "runtime") pgTempDir := tempDir err = compression.DecompressTarXz(embeddedTxzFile, tempDir) if err != nil { panic(err) } pgUser := postgres pgPass := postgres pgHost := "127.0.0.1" pgDB := postgres port, err := findOpenPort() if err != nil { panic(err) } pgDSN = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", pgHost, port, pgUser, pgPass, pgDB) cfg := pgsql.DefaultConfig() cfg = cfg.Username(pgUser) cfg = cfg.Password(pgPass) cfg = cfg.Port(uint32(port)) /* #nosec G115 */ cfg = cfg.Version(pgsql.V14) cfg = cfg.RuntimePath(pgRuntimePath) cfg = cfg.BinariesPath(pgTempDir) cfg = cfg.Database(pgDB) pg := pgsql.NewDatabase(cfg) if err := pg.Start(); err != nil { panic(err) } defer func() { if err := pg.Stop(); err != nil { panic(err) } }() m.Run() } func findOpenPort() (int, error) { addr, err := net.ResolveTCPAddr("tcp", "localhost:0") if err != nil { return 0, err } l, err := net.ListenTCP("tcp", addr) if err != nil { return 0, err } defer l.Close() return l.Addr().(*net.TCPAddr).Port, nil } func TestIntegration(t *testing.T) { // TODO(dk185217): Keep a close eye on https://github.com/ncrvoyix-swt-retail/edge-infra/pull/4018 and add support asap // Integrate with f2 framework t.SkipNow() } // TODO(dk185217): label/mark this test as slow/depends on sql. skip for now to avoid slowness func TestLabelMatching(t *testing.T) { if skipAll { // sorry, this is just avoiding the unused variables lint if it unconditionally skips t.SkipNow() } t.Logf("using dsn %s\n", pgDSN) db, err := sql.Open("pgx", pgDSN) if err != nil { t.Fatal(err) } if err := db.Ping(); err != nil { t.Fatal(err) } // executing schema should be idempotent err = execSchema(db) if err != nil { t.Fatal(err) } ctx := context.Background() dbHandle := &DBHandle{DB: db} // create a cluster and add a label clusterID, err := dbHandle.InsertCluster(ctx, "test-cluster") if err != nil { t.Fatal(err) } t.Logf("inserted new cluster. id: %v", clusterID) _, err = dbHandle.InsertClusterLabel(ctx, clusterID, "test", "true") if err != nil { t.Fatal(err) } // no artifact_versions yet, so there should be no matches clusters, err := dbHandle.GetClustersMatchingArtifactLabels(ctx, uuid.Nil) if err != nil { t.Fatal(err) } if len(clusters) != 0 { t.Fatal("found clusters matching nil artifact. this is unexpected") } // create test artifact nilDigest := "0000000000000000000000000000000000000000000000000000000000000000" // len = 64 constraint artifactVersionID, err := dbHandle.InsertArtifactVersion(ctx, "kinform-test", "latest", nilDigest) if err != nil { t.Fatal(err) } artifact := sovereign.Artifact{ ProjectID: "ret-edge-test", Repository: "workloads", ArtifactVersion: artifactVersionID, } artifactID, err := dbHandle.InsertArtifact(ctx, artifact) if err != nil { t.Fatal(err) } // create and label a specific version of that artifact _, err = dbHandle.InsertArtifactLabel(ctx, artifactID, "test", "true") if err != nil { t.Fatal(err) } // check again the cluster and artifact match clusters, err = dbHandle.GetClustersMatchingArtifactLabels(ctx, artifactID) if err != nil { t.Fatal(err) } t.Logf("found %v clusters with labels matching artifact with id %v", len(clusters), artifactID) if len(clusters) != 1 { t.Fatalf("expected artifact %v to have matched the test cluster %v", artifactID, clusterID) } } func TestConnectCluster(t *testing.T) { if skipAll { // sorry, this is just avoiding the unused variables lint if it unconditionally skips t.SkipNow() } t.Logf("using dsn %s\n", pgDSN) db, err := sql.Open("pgx", pgDSN) if err != nil { t.Fatal(err) } if err := db.Ping(); err != nil { t.Fatal(err) } err = execSchema(db) if err != nil { t.Fatal(err) } ctx := context.Background() dbHandle := &DBHandle{DB: db} clusterID := uuid.New() sessionID := uuid.New() heartbeat := model.ClusterHeartbeat{ Cluster: clusterID, ClusterVersion: model.ClusterVersionInfo{}, Timestamp: time.Now(), SessionID: sessionID, } err = dbHandle.UpdateClusterHeartbeatWithSession(ctx, heartbeat) if err != nil { t.Fatal(err) } // should be able to update session using same session id as many times as we want for i := 0; i < 50; i++ { heartbeat.Timestamp = time.Now() err = dbHandle.UpdateClusterHeartbeatWithSession(ctx, heartbeat) if err != nil { t.Fatal(err) } } // assert theres only one session testQuery := `SELECT count(*) FROM kinform_sessions WHERE cluster = $1` row := db.QueryRowContext(ctx, testQuery, clusterID) var rowCount int err = row.Scan(&rowCount) if err != nil { t.Fatal(err) } if rowCount != 1 { t.Fatalf("expected to find 1 row. got %d", rowCount) } } // TODO(dk185217): temporarily using this to check if tests are independent of each other. if they arent, this // will violate unique constraint on cluster id. remove after integrating with f2 and defining db sharing behavior // func tryCreateCluster(db *DBHandle) { // insertSQL := ` // INSERT INTO clusters(id, name, version_major, version_minor) // VALUES ($4, $1, $2, $3) // ` // _, err := db.Exec(insertSQL, "non-unique", 0, 0, uuid.Nil) // if err != nil { // panic(err) // } // }