package server_test import ( "testing" "time" server "edge-infra.dev/pkg/edge/psqlinjector" "edge-infra.dev/pkg/f8n/kinform/model" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/postgres" "github.com/google/uuid" "github.com/stretchr/testify/require" ) func validateWatchedFieldInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) { var db = postgres.FromContextT(ctx, t).DB() const selectWatchedFieldObjectID = "SELECT object_id, watched_at FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5) AND deleted IS FALSE" const selectWatchedFieldValues = "SELECT jsonpath, value, missing FROM watched_field_values WHERE object_id = $1" var timestamp time.Time var objectID uuid.UUID var row = db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace) require.NoError(t, row.Scan(&objectID, ×tamp), "error getting watched_field_objects object_id and watched_at") require.True(t, timestamp.Equal(wf.Timestamp), "timestamp does not match watched_field_objects.watched_at") var m = make(map[string]model.FieldValue) // map[JSONPath]model.FieldValue var rows, err = db.QueryContext(ctx, selectWatchedFieldValues, objectID.String()) require.NoError(t, err, "error selecting watched field values") for rows.Next() { var fv model.FieldValue require.NoError(t, rows.Scan(&fv.JSONPath, &fv.Value, &fv.Missing), "error scanning watched_field_values") m[fv.JSONPath] = fv } require.NoError(t, rows.Err()) require.Equal(t, len(m), len(wf.Fields)) for _, fv := range wf.Fields { var x = m[fv.JSONPath] require.Equal(t, x.JSONPath, fv.JSONPath) require.Equal(t, x.Value, fv.Value) require.Equal(t, x.Missing, fv.Missing) } } func validateWatchedFieldDeletedInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) { var db = postgres.FromContextT(ctx, t).DB() const selectWatchedFieldObjectID = "SELECT watched_at, deleted FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5)" var ( timestamp time.Time deleted bool ) var row = db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace) require.NoError(t, row.Scan(×tamp, &deleted), "error getting watched_field_objects object_id and watched_at") require.True(t, timestamp.Equal(wf.Timestamp), "timestamp does not match watched_field_objects.watched_at") require.True(t, deleted, "the object is not marked deleted") } func validateWatchedFieldGarbageCollectedInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) { var db = postgres.FromContextT(ctx, t).DB() const selectWatchedFieldObjectID = "SELECT COUNT(*) FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5)" var count int require.NoError(t, db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace).Scan(&count)) require.Equal(t, 0, count, "watched field not gargage collected") } func validateWatchedFieldObjectsCount(ctx f2.Context, t *testing.T, expected int) { var actual int var handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } const stmtCountObjects = `SELECT COUNT(*) FROM watched_field_objects WHERE deleted IS FALSE` require.NoError(t, handle.DB.QueryRow(stmtCountObjects).Scan(&actual), "error counting watched_field_objects table") require.Equal(t, expected, actual, "the amount of watched field objects is incorrect") } func validateWatchedFieldValuesCount(ctx f2.Context, t *testing.T, expected int) { var actual int var handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } const stmtCountValues = `SELECT COUNT(*) FROM watched_field_values WHERE object_id IN (SELECT object_id FROM watched_field_objects WHERE deleted IS FALSE)` require.NoError(t, handle.DB.QueryRow(stmtCountValues).Scan(&actual), "error counting watched_field_values table") require.Equal(t, expected, actual, "the amount of watched field values is incorrect") } func TestSQLSetClusterHeartbeatTime(t *testing.T) { var bannerProjectIDs = []string{"foo", "bar", "baz"} // map[project_id]cluster_edge_id var clusterEdgeIDs map[string]uuid.UUID var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) return ctx }). Test("set cluster heartbeat time", func(ctx f2.Context, t *testing.T) f2.Context { var handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } var statusTimes = make(map[string]time.Time) for p, ceid := range clusterEdgeIDs { statusTimes[p] = time.Now().UTC().Truncate(time.Microsecond) err := handle.SetClusterHeartbeatTime(ctx, statusTimes[p], ceid) require.NoError(t, err, "error setting cluster heartbeat time") } // Check that the scanned times equal the set times. for p, updatedAt := range selectInfraStatusUpdatedAt(ctx, t, clusterEdgeIDs) { require.Equal(t, updatedAt, statusTimes[p], "infra_status_updated_at not equal to set value") } return ctx }). Feature() f2f.Test(t, feat) } func TestSQLGetProjectIDs(t *testing.T) { var bannerProjectIDs = []string{"foo", "bar", "baz"} var handle server.DBHandle var feat = f2.NewFeature(t.Name()). Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context { handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("empty", func(ctx f2.Context, t *testing.T) f2.Context { foundProjectIDs, err := handle.GetBannerProjectIDs(ctx) require.NoError(t, err, "error getting project IDs from DBHandle") require.Empty(t, foundProjectIDs, "got project ids before db was populated") return ctx }). Test("GetBannerProjectIDs", func(ctx f2.Context, t *testing.T) f2.Context { populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) foundProjectIDs, err := handle.GetBannerProjectIDs(ctx) require.NoError(t, err, "error getting project IDs from DBHandle") require.ElementsMatch(t, bannerProjectIDs, foundProjectIDs, "got unknown or missing project ids") return ctx }). Feature() f2f.Test(t, feat) } func TestSQLSetWatchedField(t *testing.T) { var bannerProjectIDs = []string{"foo", "bar", "baz"} var handle server.DBHandle var clusterEdgeIDs map[string]uuid.UUID var wf = model.WatchedField{ APIVersion: "hello/world", Kind: "MyKind", Name: "asdf", Timestamp: time.Now().UTC().Truncate(time.Microsecond), Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) wf.Cluster = clusterEdgeIDs["foo"] return ctx }). Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context { handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context { require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field") validateWatchedFieldInDatabase(ctx, t, wf) return ctx }). Test("set more watched fields", func(ctx f2.Context, t *testing.T) f2.Context { wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond) wf.Fields = append(wf.Fields, model.FieldValue{ JSONPath: "$.foo.bar", Value: "\"asdf\"", Missing: false, }) require.NoError(t, handle.SetWatchedField(ctx, wf)) validateWatchedFieldInDatabase(ctx, t, wf) return ctx }). Test("set watched fields but cause one to be deleted", func(ctx f2.Context, t *testing.T) f2.Context { var existing = wf.Fields[0] wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond) wf.Fields = []model.FieldValue{ existing, model.FieldValue{ JSONPath: "$.another.one", Missing: true, }, } require.NoError(t, handle.SetWatchedField(ctx, wf)) validateWatchedFieldInDatabase(ctx, t, wf) return ctx }). Test("set with only the timestamp updated", func(ctx f2.Context, t *testing.T) f2.Context { wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond) require.NoError(t, handle.SetWatchedField(ctx, wf)) validateWatchedFieldInDatabase(ctx, t, wf) return ctx }). Feature() f2f.Test(t, feat) } func TestSQLDeleteWatchedField(t *testing.T) { var bannerProjectIDs = []string{"foo"} var handle server.DBHandle var clusterEdgeIDs map[string]uuid.UUID var now = time.Now().UTC().Truncate(time.Microsecond) var wf = model.WatchedField{ APIVersion: "hello/world", Kind: "MyKind", Name: "asdf", Timestamp: now, Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } var deleted = model.WatchedField{ APIVersion: "hello/world", Kind: "MyKind", Name: "deleted", Timestamp: now, Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) wf.Cluster = clusterEdgeIDs["foo"] deleted.Cluster = clusterEdgeIDs["foo"] return ctx }). Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context { handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("set watched fields", func(ctx f2.Context, t *testing.T) f2.Context { require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field") require.NoError(t, handle.SetWatchedField(ctx, deleted), "error setting watched field") validateWatchedFieldInDatabase(ctx, t, wf) validateWatchedFieldInDatabase(ctx, t, deleted) // check initial counts validateWatchedFieldObjectsCount(ctx, t, 2) validateWatchedFieldValuesCount(ctx, t, 2) return ctx }). Test("delete watched field", func(ctx f2.Context, t *testing.T) f2.Context { deleted.Timestamp = now.Add(time.Microsecond) require.NoError(t, handle.DeleteWatchedField(ctx, deleted)) validateWatchedFieldDeletedInDatabase(ctx, t, deleted) validateWatchedFieldObjectsCount(ctx, t, 1) validateWatchedFieldValuesCount(ctx, t, 1) return ctx }). Feature() f2f.Test(t, feat) } func TestDeleteOutdatedWatchedFieldObjects(t *testing.T) { var bannerProjectIDs = []string{"foo", "bar", "baz", "neo", "zed"} var handle server.DBHandle var clusterEdgeIDs map[string]uuid.UUID var wfs []model.WatchedField var now = time.Now().UTC().Truncate(time.Microsecond) var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("create and set watched field objects", func(ctx f2.Context, t *testing.T) f2.Context { var apiVersions = []string{ "hello.world/v1", "hello.world/v1beta1", "foo.bar/v2", "v1", } var kinds = []string{ "Dog", "Cat", "Fish", "Bird", "Zebra", "AntEater", } // Postgres has microsecond precision for times. The scrape messages will be within this microsecond window. var timestamps = []time.Time{ now.Add(-time.Microsecond), // before now.Add(time.Microsecond), // after } for i := 0; i < 42; i++ { var wf = model.WatchedField{ Cluster: clusterEdgeIDs[bannerProjectIDs[i%len(clusterEdgeIDs)]], APIVersion: apiVersions[i%len(apiVersions)], Kind: kinds[i%len(kinds)], Name: uuid.New().String(), Timestamp: timestamps[i%2], Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field") validateWatchedFieldInDatabase(ctx, t, wf) wfs = append(wfs, wf) } validateWatchedFieldObjectsCount(ctx, t, len(wfs)) return ctx }). Test("handle a ScrapeMessage that does not mark any objects deleted", func(ctx f2.Context, t *testing.T) f2.Context { for _, cid := range clusterEdgeIDs { var sm = &model.ScrapeMessage{ Cluster: cid, StartTime: now.Add(-time.Minute), } require.NoError(t, handle.DeleteOutdatedWatchedFieldObjects(ctx, *sm)) } validateWatchedFieldObjectsCount(ctx, t, len(wfs)) return ctx }). Test("handle a ScrapeMessage for each cluster to delete outdated objects", func(ctx f2.Context, t *testing.T) f2.Context { for _, cid := range clusterEdgeIDs { var sm = &model.ScrapeMessage{ Cluster: cid, StartTime: now, } require.NoError(t, handle.DeleteOutdatedWatchedFieldObjects(ctx, *sm)) } return ctx }). Test("verify outdated objects were deleted", func(ctx f2.Context, t *testing.T) f2.Context { var expectedCount int for _, wf := range wfs { if wf.Timestamp.Before(now) { // this message should have been deleted by the ScrapeMessage. // before calling validate, update the watched field's timestamp to match the ScrapeMessage. wf.Timestamp = now validateWatchedFieldDeletedInDatabase(ctx, t, wf) } else { // ensure the good objects remained untouched. validateWatchedFieldInDatabase(ctx, t, wf) expectedCount++ } } validateWatchedFieldObjectsCount(ctx, t, expectedCount) return ctx }). Feature() f2f.Test(t, feat) } func TestGarbageCollectDeletedWatchedFieldObjects(t *testing.T) { var bannerProjectIDs = []string{"foo", "bar", "baz", "neo", "zed"} var handle server.DBHandle var clusterEdgeIDs map[string]uuid.UUID var wfs []model.WatchedField var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("create and set watched field objects", func(ctx f2.Context, t *testing.T) f2.Context { var apiVersions = []string{ "hello.world/v1", "hello.world/v1beta1", "foo.bar/v2", "v1", } var kinds = []string{ "Dog", "Cat", "Fish", "Bird", "Zebra", "AntEater", } const delta = time.Hour + 3*time.Minute var timestamp = time.Now().UTC().Truncate(time.Microsecond).Add(-delta) // should be deleted. for i := 0; i < 40; i++ { var wf = model.WatchedField{ Cluster: clusterEdgeIDs[bannerProjectIDs[i%len(clusterEdgeIDs)]], APIVersion: apiVersions[i%len(apiVersions)], Kind: kinds[i%len(kinds)], Name: uuid.New().String(), Timestamp: timestamp, Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field") validateWatchedFieldInDatabase(ctx, t, wf) // mark half of the watched fields deleted if i%4 == 0 { // garbage collect 1/4th of the watched fields wf.Timestamp = timestamp.Add(time.Microsecond) require.NoError(t, handle.DeleteWatchedField(ctx, wf), "error marking watched field deleted") validateWatchedFieldDeletedInDatabase(ctx, t, wf) } else if i%2 == 0 { // recently deleted watched fields are not garbage collected. wf.Timestamp = timestamp.Add(delta) require.NoError(t, handle.DeleteWatchedField(ctx, wf), "error marking watched field deleted") validateWatchedFieldDeletedInDatabase(ctx, t, wf) } wfs = append(wfs, wf) } validateWatchedFieldObjectsCount(ctx, t, len(wfs)/2) return ctx }). Test("garbage collect the deleted watched fields", func(ctx f2.Context, t *testing.T) f2.Context { actual, err := handle.GarbageCollectDeletedWatchedFieldObjects(ctx) require.NoError(t, err, "could not garbage collect outdated watched field objects") require.Equal(t, actual, len(wfs)/4, "did not garbace collect 1/4th of the watched field objects") for i, wf := range wfs { if i%4 == 0 { validateWatchedFieldGarbageCollectedInDatabase(ctx, t, wf) } else if i%2 == 0 { validateWatchedFieldDeletedInDatabase(ctx, t, wf) } else { validateWatchedFieldInDatabase(ctx, t, wf) } } return ctx }). Feature() f2f.Test(t, feat) }