package server_test import ( "testing" "time" server "edge-infra.dev/pkg/edge/psqlinjector" "edge-infra.dev/pkg/f8n/kinform/model" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/postgres" "github.com/stretchr/testify/require" ) func TestTxWithoutRowLocking(t *testing.T) { var bannerProjectIDs = []string{"foo"} var handle server.DBHandle var wf1 = model.WatchedField{ APIVersion: "foo/bar1baz2", Kind: "NoisyKind", Timestamp: time.Now().UTC().Truncate(time.Microsecond), Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } var wf2 = wf1 // different names mean different watched_field_objects entries. wf1.Name = "bill" wf2.Name = "frank" var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) wf1.Cluster = clusterEdgeIDs["foo"] wf2.Cluster = clusterEdgeIDs["foo"] return ctx }). Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context { handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context { require.NoError(t, handle.SetWatchedField(ctx, wf1), "error setting watched field one") require.NoError(t, handle.SetWatchedField(ctx, wf2), "error setting watched field two") validateWatchedFieldInDatabase(ctx, t, wf1) validateWatchedFieldInDatabase(ctx, t, wf2) return ctx }). Test("set watched fields for different objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context { var now = time.Now().UTC().Truncate(time.Microsecond).Add(time.Microsecond) wf1.Timestamp = now wf2.Timestamp = now tx1, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) tx2, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) // start the first transaction id1, err := server.TxSetWatchedFieldObject(tx1, wf1) require.NoError(t, err, "could not upsert watched field object") // start and finish the second transaction id2, err := server.TxSetWatchedFieldObject(tx2, wf2) require.NoError(t, err, "could not upsert in second transaction") require.NoError(t, server.TxSetWatchedFieldValues(tx2, wf2, id2), "could not write watched field values in second transaction") require.NoError(t, tx2.Commit(), "could not commit second transaction") // finish the first transaction require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not write watched field values in first transaction") require.NoError(t, tx1.Commit(), "could not commit first transaction") // validate both watched fields were written to the database. require.NotEqual(t, id1.String(), id2.String(), "the object_id should be different") validateWatchedFieldInDatabase(ctx, t, wf1) validateWatchedFieldInDatabase(ctx, t, wf2) return ctx }). Test("set and delete watched fields for different objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context { var now = time.Now().UTC().Truncate(time.Microsecond).Add(time.Microsecond) wf1.Timestamp = now wf2.Timestamp = now // start the first transaction tx1, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) id1, err := server.TxSetWatchedFieldObject(tx1, wf1) require.NoError(t, err, "could not upsert watched field object") // start and finish the second transaction require.NoError(t, handle.DeleteWatchedField(ctx, wf2), "could not upsert in second transaction") // finish the first transaction require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not write watched field values in first transaction") require.NoError(t, tx1.Commit(), "could not commit first transaction") // validate both watched fields were written to the database. validateWatchedFieldInDatabase(ctx, t, wf1) validateWatchedFieldDeletedInDatabase(ctx, t, wf2) return ctx }). Feature() f2f.Test(t, feat) } func TestTxRowLockingSequential(t *testing.T) { var bannerProjectIDs = []string{"foo"} var handle server.DBHandle var wf = model.WatchedField{ APIVersion: "foo/bar1baz2", Kind: "NoisyKind", Name: "frank", Timestamp: time.Now().UTC().Truncate(time.Microsecond), Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) wf.Cluster = clusterEdgeIDs["foo"] return ctx }). Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context { handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context { require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field") validateWatchedFieldInDatabase(ctx, t, wf) return ctx }). Test("set sequential watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context { var now = time.Now().UTC().Truncate(time.Microsecond) wf1, wf2 := wf, wf wf1.Timestamp = now wf2.Timestamp = now.Add(time.Microsecond) tx1, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) tx2, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) // The first transaction should lock the row. id1, err := server.TxSetWatchedFieldObject(tx1, wf1) require.NoError(t, err, "could not upsert watched field object") var tx2ch = make(chan error) go func() { // this should be blocked until the first transaction completes. id2, err := server.TxSetWatchedFieldObject(tx2, wf2) tx2ch <- err tx2ch <- server.TxSetWatchedFieldValues(tx2, wf2, id2) tx2ch <- tx2.Commit() close(tx2ch) }() select { case <-time.After(time.Second): // row is locked // set the first transaction's watched field values require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } select { case <-time.After(time.Second): // row is stil locked // commit the first transaction require.NoError(t, tx1.Commit(), "could not commit first transaction") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } var timeout = time.After(5 * time.Second) for i := 0; i < 3; i++ { select { case <-timeout: t.Fatalf("second transaction did not complete quick enough") case err := <-tx2ch: require.NoError(t, err, "second transaction failed") } } // validate the 2nd transaction's watched field was written to the database. validateWatchedFieldInDatabase(ctx, t, wf2) return ctx }). Test("set watched field object, then delete it in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context { var now = time.Now().UTC().Truncate(time.Microsecond) wf1, wf2 := wf, wf wf1.Timestamp = now wf2.Timestamp = now.Add(time.Microsecond) // The first transaction should lock the row. tx1, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) id1, err := server.TxSetWatchedFieldObject(tx1, wf1) require.NoError(t, err, "could not upsert watched field object") var tx2ch = make(chan error) go func() { // this should be blocked until the first transaction completes. tx2ch <- handle.DeleteWatchedField(ctx, wf2) close(tx2ch) }() select { case <-time.After(time.Second): // row is locked // set the first transaction's watched field values require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } select { case <-time.After(time.Second): // row is stil locked // commit the first transaction require.NoError(t, tx1.Commit(), "could not commit first transaction") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } select { case <-time.After(time.Second): t.Fatalf("second transaction did not complete quick enough") case err := <-tx2ch: require.NoError(t, err, "second transaction failed") } // validate the 2nd transaction marked the watched field object as deleted in the database. validateWatchedFieldDeletedInDatabase(ctx, t, wf2) return ctx }). Feature() f2f.Test(t, feat) } func TestTxRowLockingOutdated(t *testing.T) { var bannerProjectIDs = []string{"foo"} var handle server.DBHandle var wf = model.WatchedField{ APIVersion: "foo/bar1baz2", Kind: "NoisyKind", Name: "frank", Timestamp: time.Now().UTC().Truncate(time.Microsecond), Fields: []model.FieldValue{ model.FieldValue{ JSONPath: "$.spec.foo", Value: "{\"bar\": \"baz\"}", Missing: false, }, }, } var feat = f2.NewFeature(t.Name()). Setup("db", func(ctx f2.Context, t *testing.T) f2.Context { var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...) wf.Cluster = clusterEdgeIDs["foo"] return ctx }). Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context { handle = server.DBHandle{ DB: postgres.FromContextT(ctx, t).DB(), } return ctx }). Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context { require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field") validateWatchedFieldInDatabase(ctx, t, wf) return ctx }). Test("set out-of-order watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context { var now = time.Now().UTC().Truncate(time.Microsecond) wf1, wf2 := wf, wf wf1.Timestamp = now.Add(time.Microsecond) wf2.Timestamp = now // wf2 is outdated tx1, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) tx2, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) // The first transaction should lock the row. id1, err := server.TxSetWatchedFieldObject(tx1, wf1) require.NoError(t, err, "could not upsert watched field object") var tx2ch = make(chan error) go func() { // this should be blocked until the first transaction completes. id2, err := server.TxSetWatchedFieldObject(tx2, wf2) require.Nil(t, id2, "second transaction should not return an object_id for the outdated watched field object") tx2ch <- err tx2ch <- tx2.Commit() // this is a rollback in real code, but a commit should be safe too. close(tx2ch) }() select { case <-time.After(time.Second): // row is locked // set the first transaction's watched field values require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } select { case <-time.After(time.Second): // row is stil locked // commit the first transaction require.NoError(t, tx1.Commit(), "could not commit first transaction") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } var timeout = time.After(5 * time.Second) select { case <-timeout: t.Fatal("second transaction took too long to complete") case err := <-tx2ch: // outdated watched fields should return ErrIgnoredMessage so that the pubsub message is acked without an error log. require.Error(t, err, "the second transaction should return an error") require.ErrorIs(t, err, server.ErrIgnoredMessage, "the second transaction should not upsert anything") } select { case <-timeout: t.Fatal("second transaction took too long to commit") case err := <-tx2ch: require.NoError(t, err, "second transaction commit failed") } // validate the 1st transaction's watched field is still in the database. validateWatchedFieldInDatabase(ctx, t, wf1) return ctx }). Test("set and delete out-of-order watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context { var now = time.Now().UTC().Truncate(time.Microsecond) wf1, wf2 := wf, wf wf1.Timestamp = now.Add(time.Microsecond) wf2.Timestamp = now // wf2 is outdated // The first transaction should lock the row. tx1, err := handle.DB.BeginTx(ctx, nil) require.NoError(t, err) id1, err := server.TxSetWatchedFieldObject(tx1, wf1) require.NoError(t, err, "could not upsert watched field object") var tx2ch = make(chan error) go func() { // this should be blocked until the first transaction completes. tx2ch <- handle.DeleteWatchedField(ctx, wf2) close(tx2ch) }() select { case <-time.After(time.Second): // row is locked // set the first transaction's watched field values require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } select { case <-time.After(time.Second): // row is stil locked // commit the first transaction require.NoError(t, tx1.Commit(), "could not commit first transaction") case err := <-tx2ch: t.Logf("error returned by second transaction: %v", err) t.Fatal("second transaction should be blocked until the first completes") } select { case <-time.After(time.Second): t.Fatal("second transaction took too long to complete") case err := <-tx2ch: // outdated watched fields should return ErrIgnoredMessage so that the pubsub message is acked without an error log. require.ErrorIs(t, err, server.ErrIgnoredMessage, "the second transaction should return an ignored message error for outdated deletes") } // validate the 1st transaction's watched field is still in the database. validateWatchedFieldInDatabase(ctx, t, wf1) return ctx }). Feature() f2f.Test(t, feat) }