...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/integration/sql_tx_test.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector/integration

     1  package server_test
     2  
     3  import (
     4  	"testing"
     5  	"time"
     6  
     7  	server "edge-infra.dev/pkg/edge/psqlinjector"
     8  	"edge-infra.dev/pkg/f8n/kinform/model"
     9  	"edge-infra.dev/test/f2"
    10  	"edge-infra.dev/test/f2/x/postgres"
    11  
    12  	"github.com/stretchr/testify/require"
    13  )
    14  
    15  func TestTxWithoutRowLocking(t *testing.T) {
    16  	var bannerProjectIDs = []string{"foo"}
    17  	var handle server.DBHandle
    18  	var wf1 = model.WatchedField{
    19  		APIVersion: "foo/bar1baz2",
    20  		Kind:       "NoisyKind",
    21  		Timestamp:  time.Now().UTC().Truncate(time.Microsecond),
    22  		Fields: []model.FieldValue{
    23  			model.FieldValue{
    24  				JSONPath: "$.spec.foo",
    25  				Value:    "{\"bar\": \"baz\"}",
    26  				Missing:  false,
    27  			},
    28  		},
    29  	}
    30  	var wf2 = wf1
    31  
    32  	// different names mean different watched_field_objects entries.
    33  	wf1.Name = "bill"
    34  	wf2.Name = "frank"
    35  
    36  	var feat = f2.NewFeature(t.Name()).
    37  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
    38  			var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
    39  			wf1.Cluster = clusterEdgeIDs["foo"]
    40  			wf2.Cluster = clusterEdgeIDs["foo"]
    41  			return ctx
    42  		}).
    43  		Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
    44  			handle = server.DBHandle{
    45  				DB: postgres.FromContextT(ctx, t).DB(),
    46  			}
    47  			return ctx
    48  		}).
    49  		Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
    50  			require.NoError(t, handle.SetWatchedField(ctx, wf1), "error setting watched field one")
    51  			require.NoError(t, handle.SetWatchedField(ctx, wf2), "error setting watched field two")
    52  			validateWatchedFieldInDatabase(ctx, t, wf1)
    53  			validateWatchedFieldInDatabase(ctx, t, wf2)
    54  			return ctx
    55  		}).
    56  		Test("set watched fields for different objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
    57  			var now = time.Now().UTC().Truncate(time.Microsecond).Add(time.Microsecond)
    58  			wf1.Timestamp = now
    59  			wf2.Timestamp = now
    60  
    61  			tx1, err := handle.DB.BeginTx(ctx, nil)
    62  			require.NoError(t, err)
    63  			tx2, err := handle.DB.BeginTx(ctx, nil)
    64  			require.NoError(t, err)
    65  
    66  			// start the first transaction
    67  			id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
    68  			require.NoError(t, err, "could not upsert watched field object")
    69  
    70  			// start and finish the second transaction
    71  			id2, err := server.TxSetWatchedFieldObject(tx2, wf2)
    72  			require.NoError(t, err, "could not upsert in second transaction")
    73  			require.NoError(t, server.TxSetWatchedFieldValues(tx2, wf2, id2), "could not write watched field values in second transaction")
    74  			require.NoError(t, tx2.Commit(), "could not commit second transaction")
    75  
    76  			// finish the first transaction
    77  			require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not write watched field values in first transaction")
    78  			require.NoError(t, tx1.Commit(), "could not commit first transaction")
    79  
    80  			// validate both watched fields were written to the database.
    81  			require.NotEqual(t, id1.String(), id2.String(), "the object_id should be different")
    82  			validateWatchedFieldInDatabase(ctx, t, wf1)
    83  			validateWatchedFieldInDatabase(ctx, t, wf2)
    84  			return ctx
    85  		}).
    86  		Test("set and delete watched fields for different objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
    87  			var now = time.Now().UTC().Truncate(time.Microsecond).Add(time.Microsecond)
    88  			wf1.Timestamp = now
    89  			wf2.Timestamp = now
    90  
    91  			// start the first transaction
    92  			tx1, err := handle.DB.BeginTx(ctx, nil)
    93  			require.NoError(t, err)
    94  			id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
    95  			require.NoError(t, err, "could not upsert watched field object")
    96  
    97  			// start and finish the second transaction
    98  			require.NoError(t, handle.DeleteWatchedField(ctx, wf2), "could not upsert in second transaction")
    99  
   100  			// finish the first transaction
   101  			require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not write watched field values in first transaction")
   102  			require.NoError(t, tx1.Commit(), "could not commit first transaction")
   103  
   104  			// validate both watched fields were written to the database.
   105  			validateWatchedFieldInDatabase(ctx, t, wf1)
   106  			validateWatchedFieldDeletedInDatabase(ctx, t, wf2)
   107  			return ctx
   108  		}).
   109  		Feature()
   110  
   111  	f2f.Test(t, feat)
   112  }
   113  
   114  func TestTxRowLockingSequential(t *testing.T) {
   115  	var bannerProjectIDs = []string{"foo"}
   116  	var handle server.DBHandle
   117  	var wf = model.WatchedField{
   118  		APIVersion: "foo/bar1baz2",
   119  		Kind:       "NoisyKind",
   120  		Name:       "frank",
   121  		Timestamp:  time.Now().UTC().Truncate(time.Microsecond),
   122  		Fields: []model.FieldValue{
   123  			model.FieldValue{
   124  				JSONPath: "$.spec.foo",
   125  				Value:    "{\"bar\": \"baz\"}",
   126  				Missing:  false,
   127  			},
   128  		},
   129  	}
   130  
   131  	var feat = f2.NewFeature(t.Name()).
   132  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
   133  			var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
   134  			wf.Cluster = clusterEdgeIDs["foo"]
   135  			return ctx
   136  		}).
   137  		Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
   138  			handle = server.DBHandle{
   139  				DB: postgres.FromContextT(ctx, t).DB(),
   140  			}
   141  			return ctx
   142  		}).
   143  		Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
   144  			require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
   145  			validateWatchedFieldInDatabase(ctx, t, wf)
   146  			return ctx
   147  		}).
   148  		Test("set sequential watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
   149  			var now = time.Now().UTC().Truncate(time.Microsecond)
   150  			wf1, wf2 := wf, wf
   151  			wf1.Timestamp = now
   152  			wf2.Timestamp = now.Add(time.Microsecond)
   153  
   154  			tx1, err := handle.DB.BeginTx(ctx, nil)
   155  			require.NoError(t, err)
   156  			tx2, err := handle.DB.BeginTx(ctx, nil)
   157  			require.NoError(t, err)
   158  
   159  			// The first transaction should lock the row.
   160  			id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
   161  			require.NoError(t, err, "could not upsert watched field object")
   162  
   163  			var tx2ch = make(chan error)
   164  			go func() {
   165  				// this should be blocked until the first transaction completes.
   166  				id2, err := server.TxSetWatchedFieldObject(tx2, wf2)
   167  				tx2ch <- err
   168  				tx2ch <- server.TxSetWatchedFieldValues(tx2, wf2, id2)
   169  				tx2ch <- tx2.Commit()
   170  				close(tx2ch)
   171  			}()
   172  
   173  			select {
   174  			case <-time.After(time.Second):
   175  				// row is locked
   176  				// set the first transaction's watched field values
   177  				require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
   178  			case err := <-tx2ch:
   179  				t.Logf("error returned by second transaction: %v", err)
   180  				t.Fatal("second transaction should be blocked until the first completes")
   181  			}
   182  
   183  			select {
   184  			case <-time.After(time.Second):
   185  				// row is stil locked
   186  				// commit the first transaction
   187  				require.NoError(t, tx1.Commit(), "could not commit first transaction")
   188  			case err := <-tx2ch:
   189  				t.Logf("error returned by second transaction: %v", err)
   190  				t.Fatal("second transaction should be blocked until the first completes")
   191  			}
   192  
   193  			var timeout = time.After(5 * time.Second)
   194  			for i := 0; i < 3; i++ {
   195  				select {
   196  				case <-timeout:
   197  					t.Fatalf("second transaction did not complete quick enough")
   198  				case err := <-tx2ch:
   199  					require.NoError(t, err, "second transaction failed")
   200  				}
   201  			}
   202  
   203  			// validate the 2nd transaction's watched field was written to the database.
   204  			validateWatchedFieldInDatabase(ctx, t, wf2)
   205  			return ctx
   206  		}).
   207  		Test("set watched field object, then delete it in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
   208  			var now = time.Now().UTC().Truncate(time.Microsecond)
   209  			wf1, wf2 := wf, wf
   210  			wf1.Timestamp = now
   211  			wf2.Timestamp = now.Add(time.Microsecond)
   212  
   213  			// The first transaction should lock the row.
   214  			tx1, err := handle.DB.BeginTx(ctx, nil)
   215  			require.NoError(t, err)
   216  			id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
   217  			require.NoError(t, err, "could not upsert watched field object")
   218  
   219  			var tx2ch = make(chan error)
   220  			go func() {
   221  				// this should be blocked until the first transaction completes.
   222  				tx2ch <- handle.DeleteWatchedField(ctx, wf2)
   223  				close(tx2ch)
   224  			}()
   225  
   226  			select {
   227  			case <-time.After(time.Second):
   228  				// row is locked
   229  				// set the first transaction's watched field values
   230  				require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
   231  			case err := <-tx2ch:
   232  				t.Logf("error returned by second transaction: %v", err)
   233  				t.Fatal("second transaction should be blocked until the first completes")
   234  			}
   235  
   236  			select {
   237  			case <-time.After(time.Second):
   238  				// row is stil locked
   239  				// commit the first transaction
   240  				require.NoError(t, tx1.Commit(), "could not commit first transaction")
   241  			case err := <-tx2ch:
   242  				t.Logf("error returned by second transaction: %v", err)
   243  				t.Fatal("second transaction should be blocked until the first completes")
   244  			}
   245  
   246  			select {
   247  			case <-time.After(time.Second):
   248  				t.Fatalf("second transaction did not complete quick enough")
   249  			case err := <-tx2ch:
   250  				require.NoError(t, err, "second transaction failed")
   251  			}
   252  
   253  			// validate the 2nd transaction marked the watched field object as deleted in the database.
   254  			validateWatchedFieldDeletedInDatabase(ctx, t, wf2)
   255  			return ctx
   256  		}).
   257  		Feature()
   258  
   259  	f2f.Test(t, feat)
   260  }
   261  
   262  func TestTxRowLockingOutdated(t *testing.T) {
   263  	var bannerProjectIDs = []string{"foo"}
   264  	var handle server.DBHandle
   265  	var wf = model.WatchedField{
   266  		APIVersion: "foo/bar1baz2",
   267  		Kind:       "NoisyKind",
   268  		Name:       "frank",
   269  		Timestamp:  time.Now().UTC().Truncate(time.Microsecond),
   270  		Fields: []model.FieldValue{
   271  			model.FieldValue{
   272  				JSONPath: "$.spec.foo",
   273  				Value:    "{\"bar\": \"baz\"}",
   274  				Missing:  false,
   275  			},
   276  		},
   277  	}
   278  
   279  	var feat = f2.NewFeature(t.Name()).
   280  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
   281  			var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
   282  			wf.Cluster = clusterEdgeIDs["foo"]
   283  			return ctx
   284  		}).
   285  		Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
   286  			handle = server.DBHandle{
   287  				DB: postgres.FromContextT(ctx, t).DB(),
   288  			}
   289  			return ctx
   290  		}).
   291  		Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
   292  			require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
   293  			validateWatchedFieldInDatabase(ctx, t, wf)
   294  			return ctx
   295  		}).
   296  		Test("set out-of-order watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
   297  			var now = time.Now().UTC().Truncate(time.Microsecond)
   298  			wf1, wf2 := wf, wf
   299  			wf1.Timestamp = now.Add(time.Microsecond)
   300  			wf2.Timestamp = now // wf2 is outdated
   301  
   302  			tx1, err := handle.DB.BeginTx(ctx, nil)
   303  			require.NoError(t, err)
   304  			tx2, err := handle.DB.BeginTx(ctx, nil)
   305  			require.NoError(t, err)
   306  
   307  			// The first transaction should lock the row.
   308  			id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
   309  			require.NoError(t, err, "could not upsert watched field object")
   310  
   311  			var tx2ch = make(chan error)
   312  			go func() {
   313  				// this should be blocked until the first transaction completes.
   314  				id2, err := server.TxSetWatchedFieldObject(tx2, wf2)
   315  				require.Nil(t, id2, "second transaction should not return an object_id for the outdated watched field object")
   316  				tx2ch <- err
   317  				tx2ch <- tx2.Commit() // this is a rollback in real code, but a commit should be safe too.
   318  				close(tx2ch)
   319  			}()
   320  
   321  			select {
   322  			case <-time.After(time.Second):
   323  				// row is locked
   324  				// set the first transaction's watched field values
   325  				require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
   326  			case err := <-tx2ch:
   327  				t.Logf("error returned by second transaction: %v", err)
   328  				t.Fatal("second transaction should be blocked until the first completes")
   329  			}
   330  
   331  			select {
   332  			case <-time.After(time.Second):
   333  				// row is stil locked
   334  				// commit the first transaction
   335  				require.NoError(t, tx1.Commit(), "could not commit first transaction")
   336  			case err := <-tx2ch:
   337  				t.Logf("error returned by second transaction: %v", err)
   338  				t.Fatal("second transaction should be blocked until the first completes")
   339  			}
   340  
   341  			var timeout = time.After(5 * time.Second)
   342  			select {
   343  			case <-timeout:
   344  				t.Fatal("second transaction took too long to complete")
   345  			case err := <-tx2ch:
   346  				// outdated watched fields should return ErrIgnoredMessage so that the pubsub message is acked without an error log.
   347  				require.Error(t, err, "the second transaction should return an error")
   348  				require.ErrorIs(t, err, server.ErrIgnoredMessage, "the second transaction should not upsert anything")
   349  			}
   350  			select {
   351  			case <-timeout:
   352  				t.Fatal("second transaction took too long to commit")
   353  			case err := <-tx2ch:
   354  				require.NoError(t, err, "second transaction commit failed")
   355  			}
   356  
   357  			// validate the 1st transaction's watched field is still in the database.
   358  			validateWatchedFieldInDatabase(ctx, t, wf1)
   359  			return ctx
   360  		}).
   361  		Test("set and delete out-of-order watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
   362  			var now = time.Now().UTC().Truncate(time.Microsecond)
   363  			wf1, wf2 := wf, wf
   364  			wf1.Timestamp = now.Add(time.Microsecond)
   365  			wf2.Timestamp = now // wf2 is outdated
   366  
   367  			// The first transaction should lock the row.
   368  			tx1, err := handle.DB.BeginTx(ctx, nil)
   369  			require.NoError(t, err)
   370  			id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
   371  			require.NoError(t, err, "could not upsert watched field object")
   372  
   373  			var tx2ch = make(chan error)
   374  			go func() {
   375  				// this should be blocked until the first transaction completes.
   376  				tx2ch <- handle.DeleteWatchedField(ctx, wf2)
   377  				close(tx2ch)
   378  			}()
   379  
   380  			select {
   381  			case <-time.After(time.Second):
   382  				// row is locked
   383  				// set the first transaction's watched field values
   384  				require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
   385  			case err := <-tx2ch:
   386  				t.Logf("error returned by second transaction: %v", err)
   387  				t.Fatal("second transaction should be blocked until the first completes")
   388  			}
   389  
   390  			select {
   391  			case <-time.After(time.Second):
   392  				// row is stil locked
   393  				// commit the first transaction
   394  				require.NoError(t, tx1.Commit(), "could not commit first transaction")
   395  			case err := <-tx2ch:
   396  				t.Logf("error returned by second transaction: %v", err)
   397  				t.Fatal("second transaction should be blocked until the first completes")
   398  			}
   399  
   400  			select {
   401  			case <-time.After(time.Second):
   402  				t.Fatal("second transaction took too long to complete")
   403  			case err := <-tx2ch:
   404  				// outdated watched fields should return ErrIgnoredMessage so that the pubsub message is acked without an error log.
   405  				require.ErrorIs(t, err, server.ErrIgnoredMessage, "the second transaction should return an ignored message error for outdated deletes")
   406  			}
   407  
   408  			// validate the 1st transaction's watched field is still in the database.
   409  			validateWatchedFieldInDatabase(ctx, t, wf1)
   410  			return ctx
   411  		}).
   412  		Feature()
   413  
   414  	f2f.Test(t, feat)
   415  }
   416  

View as plain text