package server_test import ( "context" "testing" "time" server "edge-infra.dev/pkg/edge/psqlinjector" "edge-infra.dev/pkg/f8n/kinform/model" "edge-infra.dev/test/f2" "cloud.google.com/go/pubsub" "github.com/google/uuid" "github.com/stretchr/testify/require" ) func TestInjectorClusterHeartbeat(t *testing.T) { var cfg = new(server.Config) var bannerProjectIDs = []string{"foo", "bar", "baz"} // We set all the cluster's clusters.infra_status_updated_at to this time, then check that it's updated. var start = time.Now().UTC().Truncate(time.Microsecond) // truncated due to precision https://www.postgresql.org/docs/current/datatype-datetime.html t.Logf("start time %v", start) // map[project_id] var clusterEdgeIDs map[string]uuid.UUID // teardown psqlinjector plumbing var doneCtx, stopInjector = context.WithCancel(context.Background()) var runErrCh chan error // created when the "run" test executes. var feat = f2.NewFeature(t.Name()). Setup("config", func(ctx f2.Context, t *testing.T) f2.Context { return setupConfig(ctx, t, cfg) }). Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context { return createKinformTopics(ctx, t, cfg, bannerProjectIDs...) }). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) return ctx }). Test("run", func(ctx f2.Context, t *testing.T) f2.Context { pi, err := server.New(cfg) require.NoError(t, err, "could not create injector") runErrCh = make(chan error, 1) go func() { runErrCh <- pi.Run(doneCtx) }() return ctx }). Test("send heartbeat", func(ctx f2.Context, t *testing.T) f2.Context { return sendHeartbeats(ctx, t, cfg, start, clusterEdgeIDs) }). Test("verify infra_status_updated_at times", func(ctx f2.Context, t *testing.T) f2.Context { // allow time for the messages to flow from pubsub to psqlinjector to the database. var timeout = time.After(5 * time.Second) for { select { case <-time.After(50 * time.Millisecond): case <-timeout: t.Fatalf("did not update status times quick enough") } var updated = true var updatedTimes = selectInfraStatusUpdatedAt(ctx, t, clusterEdgeIDs) for _, updatedAt := range updatedTimes { if !start.Equal(updatedAt) { updated = false } } if updated { t.Logf("found updated times %v", updatedTimes) return ctx } } }). Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context { stopInjector() // before receiving from the runErrCh channel, check that the "run" test created the channel. require.NotNil(t, runErrCh, "psqlinjector never started") // When managers stop due to context canceled they don't return an error. var err = <-runErrCh require.NoErrorf(t, err, "psqlinjector.Run returned unexpected error: %v", err) return ctx }). Feature() f2f.Test(t, feat) } func sendHeartbeats(ctx f2.Context, t *testing.T, cfg *server.Config, timestamp time.Time, clusterEdgeIDs map[string]uuid.UUID) f2.Context { // These checks ensure that the timestamp will equal the time scanned from the database. require.True(t, timestamp.Equal(timestamp.Truncate(time.Microsecond)), "postgres has a time precision of 1 microsecond") require.Equal(t, timestamp.Location(), time.UTC, "postgres uses UTC") var sessionID = uuid.New() for p, ceid := range clusterEdgeIDs { var client = createPubSubClient(ctx, t, p) var msg = model.ClusterHeartbeat{ Cluster: ceid, Timestamp: timestamp, SessionID: sessionID, } var data, _ = msg.Data() result := client.Topic(cfg.TopicID).Publish(ctx, &pubsub.Message{ Attributes: map[string]string{ model.AttrPayloadType: msg.PayloadType(), model.AttrClusterUUID: ceid.String(), }, Data: data, }) _, err := result.Get(ctx) require.NoError(t, err, "error publishing heartbeat") client.Topic(cfg.TopicID).Flush() } return ctx }