...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/integration/sql_test.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector/integration

     1  package server_test
     2  
     3  import (
     4  	"testing"
     5  	"time"
     6  
     7  	server "edge-infra.dev/pkg/edge/psqlinjector"
     8  	"edge-infra.dev/pkg/f8n/kinform/model"
     9  	"edge-infra.dev/test/f2"
    10  	"edge-infra.dev/test/f2/x/postgres"
    11  
    12  	"github.com/google/uuid"
    13  	"github.com/stretchr/testify/require"
    14  )
    15  
    16  func validateWatchedFieldInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) {
    17  	var db = postgres.FromContextT(ctx, t).DB()
    18  	const selectWatchedFieldObjectID = "SELECT object_id, watched_at FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5) AND deleted IS FALSE"
    19  	const selectWatchedFieldValues = "SELECT jsonpath, value, missing FROM watched_field_values WHERE object_id = $1"
    20  
    21  	var timestamp time.Time
    22  	var objectID uuid.UUID
    23  	var row = db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace)
    24  	require.NoError(t, row.Scan(&objectID, &timestamp), "error getting watched_field_objects object_id and watched_at")
    25  	require.True(t, timestamp.Equal(wf.Timestamp), "timestamp does not match watched_field_objects.watched_at")
    26  
    27  	var m = make(map[string]model.FieldValue) // map[JSONPath]model.FieldValue
    28  	var rows, err = db.QueryContext(ctx, selectWatchedFieldValues, objectID.String())
    29  	require.NoError(t, err, "error selecting watched field values")
    30  	for rows.Next() {
    31  		var fv model.FieldValue
    32  		require.NoError(t, rows.Scan(&fv.JSONPath, &fv.Value, &fv.Missing), "error scanning watched_field_values")
    33  		m[fv.JSONPath] = fv
    34  	}
    35  	require.NoError(t, rows.Err())
    36  	require.Equal(t, len(m), len(wf.Fields))
    37  
    38  	for _, fv := range wf.Fields {
    39  		var x = m[fv.JSONPath]
    40  		require.Equal(t, x.JSONPath, fv.JSONPath)
    41  		require.Equal(t, x.Value, fv.Value)
    42  		require.Equal(t, x.Missing, fv.Missing)
    43  	}
    44  }
    45  
    46  func validateWatchedFieldDeletedInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) {
    47  	var db = postgres.FromContextT(ctx, t).DB()
    48  	const selectWatchedFieldObjectID = "SELECT watched_at, deleted FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5)"
    49  
    50  	var (
    51  		timestamp time.Time
    52  		deleted   bool
    53  	)
    54  	var row = db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace)
    55  	require.NoError(t, row.Scan(&timestamp, &deleted), "error getting watched_field_objects object_id and watched_at")
    56  	require.True(t, timestamp.Equal(wf.Timestamp), "timestamp does not match watched_field_objects.watched_at")
    57  	require.True(t, deleted, "the object is not marked deleted")
    58  }
    59  
    60  func validateWatchedFieldGarbageCollectedInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) {
    61  	var db = postgres.FromContextT(ctx, t).DB()
    62  	const selectWatchedFieldObjectID = "SELECT COUNT(*) FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5)"
    63  	var count int
    64  	require.NoError(t, db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace).Scan(&count))
    65  	require.Equal(t, 0, count, "watched field not gargage collected")
    66  }
    67  
    68  func validateWatchedFieldObjectsCount(ctx f2.Context, t *testing.T, expected int) {
    69  	var actual int
    70  	var handle = server.DBHandle{
    71  		DB: postgres.FromContextT(ctx, t).DB(),
    72  	}
    73  
    74  	const stmtCountObjects = `SELECT COUNT(*) FROM watched_field_objects WHERE deleted IS FALSE`
    75  	require.NoError(t, handle.DB.QueryRow(stmtCountObjects).Scan(&actual), "error counting watched_field_objects table")
    76  	require.Equal(t, expected, actual, "the amount of watched field objects is incorrect")
    77  }
    78  
    79  func validateWatchedFieldValuesCount(ctx f2.Context, t *testing.T, expected int) {
    80  	var actual int
    81  	var handle = server.DBHandle{
    82  		DB: postgres.FromContextT(ctx, t).DB(),
    83  	}
    84  
    85  	const stmtCountValues = `SELECT COUNT(*) FROM watched_field_values WHERE object_id IN (SELECT object_id FROM watched_field_objects WHERE deleted IS FALSE)`
    86  	require.NoError(t, handle.DB.QueryRow(stmtCountValues).Scan(&actual), "error counting watched_field_values table")
    87  	require.Equal(t, expected, actual, "the amount of watched field values is incorrect")
    88  }
    89  
    90  func TestSQLSetClusterHeartbeatTime(t *testing.T) {
    91  	var bannerProjectIDs = []string{"foo", "bar", "baz"}
    92  
    93  	// map[project_id]cluster_edge_id
    94  	var clusterEdgeIDs map[string]uuid.UUID
    95  
    96  	var feat = f2.NewFeature(t.Name()).
    97  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
    98  			clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
    99  			return ctx
   100  		}).
   101  		Test("set cluster heartbeat time", func(ctx f2.Context, t *testing.T) f2.Context {
   102  			var handle = server.DBHandle{
   103  				DB: postgres.FromContextT(ctx, t).DB(),
   104  			}
   105  
   106  			var statusTimes = make(map[string]time.Time)
   107  			for p, ceid := range clusterEdgeIDs {
   108  				statusTimes[p] = time.Now().UTC().Truncate(time.Microsecond)
   109  
   110  				err := handle.SetClusterHeartbeatTime(ctx, statusTimes[p], ceid)
   111  				require.NoError(t, err, "error setting cluster heartbeat time")
   112  			}
   113  
   114  			// Check that the scanned times equal the set times.
   115  			for p, updatedAt := range selectInfraStatusUpdatedAt(ctx, t, clusterEdgeIDs) {
   116  				require.Equal(t, updatedAt, statusTimes[p], "infra_status_updated_at not equal to set value")
   117  			}
   118  
   119  			return ctx
   120  		}).
   121  		Feature()
   122  
   123  	f2f.Test(t, feat)
   124  }
   125  
   126  func TestSQLGetProjectIDs(t *testing.T) {
   127  	var bannerProjectIDs = []string{"foo", "bar", "baz"}
   128  	var handle server.DBHandle
   129  
   130  	var feat = f2.NewFeature(t.Name()).
   131  		Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
   132  			handle = server.DBHandle{
   133  				DB: postgres.FromContextT(ctx, t).DB(),
   134  			}
   135  			return ctx
   136  		}).
   137  		Test("empty", func(ctx f2.Context, t *testing.T) f2.Context {
   138  			foundProjectIDs, err := handle.GetBannerProjectIDs(ctx)
   139  			require.NoError(t, err, "error getting project IDs from DBHandle")
   140  			require.Empty(t, foundProjectIDs, "got project ids before db was populated")
   141  			return ctx
   142  		}).
   143  		Test("GetBannerProjectIDs", func(ctx f2.Context, t *testing.T) f2.Context {
   144  			populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
   145  
   146  			foundProjectIDs, err := handle.GetBannerProjectIDs(ctx)
   147  			require.NoError(t, err, "error getting project IDs from DBHandle")
   148  			require.ElementsMatch(t, bannerProjectIDs, foundProjectIDs, "got unknown or missing project ids")
   149  			return ctx
   150  		}).
   151  		Feature()
   152  
   153  	f2f.Test(t, feat)
   154  }
   155  
   156  func TestSQLSetWatchedField(t *testing.T) {
   157  	var bannerProjectIDs = []string{"foo", "bar", "baz"}
   158  	var handle server.DBHandle
   159  	var clusterEdgeIDs map[string]uuid.UUID
   160  	var wf = model.WatchedField{
   161  		APIVersion: "hello/world",
   162  		Kind:       "MyKind",
   163  		Name:       "asdf",
   164  		Timestamp:  time.Now().UTC().Truncate(time.Microsecond),
   165  		Fields: []model.FieldValue{
   166  			model.FieldValue{
   167  				JSONPath: "$.spec.foo",
   168  				Value:    "{\"bar\": \"baz\"}",
   169  				Missing:  false,
   170  			},
   171  		},
   172  	}
   173  
   174  	var feat = f2.NewFeature(t.Name()).
   175  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
   176  			clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
   177  			wf.Cluster = clusterEdgeIDs["foo"]
   178  			return ctx
   179  		}).
   180  		Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
   181  			handle = server.DBHandle{
   182  				DB: postgres.FromContextT(ctx, t).DB(),
   183  			}
   184  			return ctx
   185  		}).
   186  		Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
   187  			require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
   188  			validateWatchedFieldInDatabase(ctx, t, wf)
   189  			return ctx
   190  		}).
   191  		Test("set more watched fields", func(ctx f2.Context, t *testing.T) f2.Context {
   192  			wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond)
   193  			wf.Fields = append(wf.Fields, model.FieldValue{
   194  				JSONPath: "$.foo.bar",
   195  				Value:    "\"asdf\"",
   196  				Missing:  false,
   197  			})
   198  			require.NoError(t, handle.SetWatchedField(ctx, wf))
   199  			validateWatchedFieldInDatabase(ctx, t, wf)
   200  			return ctx
   201  		}).
   202  		Test("set watched fields but cause one to be deleted", func(ctx f2.Context, t *testing.T) f2.Context {
   203  			var existing = wf.Fields[0]
   204  			wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond)
   205  			wf.Fields = []model.FieldValue{
   206  				existing,
   207  				model.FieldValue{
   208  					JSONPath: "$.another.one",
   209  					Missing:  true,
   210  				},
   211  			}
   212  
   213  			require.NoError(t, handle.SetWatchedField(ctx, wf))
   214  			validateWatchedFieldInDatabase(ctx, t, wf)
   215  			return ctx
   216  		}).
   217  		Test("set with only the timestamp updated", func(ctx f2.Context, t *testing.T) f2.Context {
   218  			wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond)
   219  			require.NoError(t, handle.SetWatchedField(ctx, wf))
   220  			validateWatchedFieldInDatabase(ctx, t, wf)
   221  			return ctx
   222  		}).
   223  		Feature()
   224  
   225  	f2f.Test(t, feat)
   226  }
   227  
   228  func TestSQLDeleteWatchedField(t *testing.T) {
   229  	var bannerProjectIDs = []string{"foo"}
   230  	var handle server.DBHandle
   231  	var clusterEdgeIDs map[string]uuid.UUID
   232  	var now = time.Now().UTC().Truncate(time.Microsecond)
   233  	var wf = model.WatchedField{
   234  		APIVersion: "hello/world",
   235  		Kind:       "MyKind",
   236  		Name:       "asdf",
   237  		Timestamp:  now,
   238  		Fields: []model.FieldValue{
   239  			model.FieldValue{
   240  				JSONPath: "$.spec.foo",
   241  				Value:    "{\"bar\": \"baz\"}",
   242  				Missing:  false,
   243  			},
   244  		},
   245  	}
   246  	var deleted = model.WatchedField{
   247  		APIVersion: "hello/world",
   248  		Kind:       "MyKind",
   249  		Name:       "deleted",
   250  		Timestamp:  now,
   251  		Fields: []model.FieldValue{
   252  			model.FieldValue{
   253  				JSONPath: "$.spec.foo",
   254  				Value:    "{\"bar\": \"baz\"}",
   255  				Missing:  false,
   256  			},
   257  		},
   258  	}
   259  
   260  	var feat = f2.NewFeature(t.Name()).
   261  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
   262  			clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
   263  			wf.Cluster = clusterEdgeIDs["foo"]
   264  			deleted.Cluster = clusterEdgeIDs["foo"]
   265  			return ctx
   266  		}).
   267  		Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
   268  			handle = server.DBHandle{
   269  				DB: postgres.FromContextT(ctx, t).DB(),
   270  			}
   271  			return ctx
   272  		}).
   273  		Test("set watched fields", func(ctx f2.Context, t *testing.T) f2.Context {
   274  			require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
   275  			require.NoError(t, handle.SetWatchedField(ctx, deleted), "error setting watched field")
   276  			validateWatchedFieldInDatabase(ctx, t, wf)
   277  			validateWatchedFieldInDatabase(ctx, t, deleted)
   278  
   279  			// check initial counts
   280  			validateWatchedFieldObjectsCount(ctx, t, 2)
   281  			validateWatchedFieldValuesCount(ctx, t, 2)
   282  
   283  			return ctx
   284  		}).
   285  		Test("delete watched field", func(ctx f2.Context, t *testing.T) f2.Context {
   286  			deleted.Timestamp = now.Add(time.Microsecond)
   287  			require.NoError(t, handle.DeleteWatchedField(ctx, deleted))
   288  
   289  			validateWatchedFieldDeletedInDatabase(ctx, t, deleted)
   290  			validateWatchedFieldObjectsCount(ctx, t, 1)
   291  			validateWatchedFieldValuesCount(ctx, t, 1)
   292  
   293  			return ctx
   294  		}).
   295  		Feature()
   296  
   297  	f2f.Test(t, feat)
   298  }
   299  
   300  func TestDeleteOutdatedWatchedFieldObjects(t *testing.T) {
   301  	var bannerProjectIDs = []string{"foo", "bar", "baz", "neo", "zed"}
   302  	var handle server.DBHandle
   303  	var clusterEdgeIDs map[string]uuid.UUID
   304  	var wfs []model.WatchedField
   305  	var now = time.Now().UTC().Truncate(time.Microsecond)
   306  	var feat = f2.NewFeature(t.Name()).
   307  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
   308  			clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
   309  			handle = server.DBHandle{
   310  				DB: postgres.FromContextT(ctx, t).DB(),
   311  			}
   312  			return ctx
   313  		}).
   314  		Test("create and set watched field objects", func(ctx f2.Context, t *testing.T) f2.Context {
   315  			var apiVersions = []string{
   316  				"hello.world/v1",
   317  				"hello.world/v1beta1",
   318  				"foo.bar/v2",
   319  				"v1",
   320  			}
   321  			var kinds = []string{
   322  				"Dog",
   323  				"Cat",
   324  				"Fish",
   325  				"Bird",
   326  				"Zebra",
   327  				"AntEater",
   328  			}
   329  			// Postgres has microsecond precision for times. The scrape messages will be within this microsecond window.
   330  			var timestamps = []time.Time{
   331  				now.Add(-time.Microsecond), // before
   332  				now.Add(time.Microsecond),  // after
   333  			}
   334  			for i := 0; i < 42; i++ {
   335  				var wf = model.WatchedField{
   336  					Cluster:    clusterEdgeIDs[bannerProjectIDs[i%len(clusterEdgeIDs)]],
   337  					APIVersion: apiVersions[i%len(apiVersions)],
   338  					Kind:       kinds[i%len(kinds)],
   339  					Name:       uuid.New().String(),
   340  					Timestamp:  timestamps[i%2],
   341  					Fields: []model.FieldValue{
   342  						model.FieldValue{
   343  							JSONPath: "$.spec.foo",
   344  							Value:    "{\"bar\": \"baz\"}",
   345  							Missing:  false,
   346  						},
   347  					},
   348  				}
   349  				require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
   350  				validateWatchedFieldInDatabase(ctx, t, wf)
   351  				wfs = append(wfs, wf)
   352  			}
   353  			validateWatchedFieldObjectsCount(ctx, t, len(wfs))
   354  			return ctx
   355  		}).
   356  		Test("handle a ScrapeMessage that does not mark any objects deleted", func(ctx f2.Context, t *testing.T) f2.Context {
   357  			for _, cid := range clusterEdgeIDs {
   358  				var sm = &model.ScrapeMessage{
   359  					Cluster:   cid,
   360  					StartTime: now.Add(-time.Minute),
   361  				}
   362  				require.NoError(t, handle.DeleteOutdatedWatchedFieldObjects(ctx, *sm))
   363  			}
   364  			validateWatchedFieldObjectsCount(ctx, t, len(wfs))
   365  			return ctx
   366  		}).
   367  		Test("handle a ScrapeMessage for each cluster to delete outdated objects", func(ctx f2.Context, t *testing.T) f2.Context {
   368  			for _, cid := range clusterEdgeIDs {
   369  				var sm = &model.ScrapeMessage{
   370  					Cluster:   cid,
   371  					StartTime: now,
   372  				}
   373  				require.NoError(t, handle.DeleteOutdatedWatchedFieldObjects(ctx, *sm))
   374  			}
   375  			return ctx
   376  		}).
   377  		Test("verify outdated objects were deleted", func(ctx f2.Context, t *testing.T) f2.Context {
   378  			var expectedCount int
   379  			for _, wf := range wfs {
   380  				if wf.Timestamp.Before(now) {
   381  					// this message should have been deleted by the ScrapeMessage.
   382  					// before calling validate, update the watched field's timestamp to match the ScrapeMessage.
   383  					wf.Timestamp = now
   384  					validateWatchedFieldDeletedInDatabase(ctx, t, wf)
   385  				} else {
   386  					// ensure the good objects remained untouched.
   387  					validateWatchedFieldInDatabase(ctx, t, wf)
   388  					expectedCount++
   389  				}
   390  			}
   391  			validateWatchedFieldObjectsCount(ctx, t, expectedCount)
   392  			return ctx
   393  		}).
   394  		Feature()
   395  
   396  	f2f.Test(t, feat)
   397  }
   398  
   399  func TestGarbageCollectDeletedWatchedFieldObjects(t *testing.T) {
   400  	var bannerProjectIDs = []string{"foo", "bar", "baz", "neo", "zed"}
   401  	var handle server.DBHandle
   402  	var clusterEdgeIDs map[string]uuid.UUID
   403  	var wfs []model.WatchedField
   404  	var feat = f2.NewFeature(t.Name()).
   405  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
   406  			clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
   407  			handle = server.DBHandle{
   408  				DB: postgres.FromContextT(ctx, t).DB(),
   409  			}
   410  			return ctx
   411  		}).
   412  		Test("create and set watched field objects", func(ctx f2.Context, t *testing.T) f2.Context {
   413  			var apiVersions = []string{
   414  				"hello.world/v1",
   415  				"hello.world/v1beta1",
   416  				"foo.bar/v2",
   417  				"v1",
   418  			}
   419  			var kinds = []string{
   420  				"Dog",
   421  				"Cat",
   422  				"Fish",
   423  				"Bird",
   424  				"Zebra",
   425  				"AntEater",
   426  			}
   427  			const delta = time.Hour + 3*time.Minute
   428  			var timestamp = time.Now().UTC().Truncate(time.Microsecond).Add(-delta) // should be deleted.
   429  			for i := 0; i < 40; i++ {
   430  				var wf = model.WatchedField{
   431  					Cluster:    clusterEdgeIDs[bannerProjectIDs[i%len(clusterEdgeIDs)]],
   432  					APIVersion: apiVersions[i%len(apiVersions)],
   433  					Kind:       kinds[i%len(kinds)],
   434  					Name:       uuid.New().String(),
   435  					Timestamp:  timestamp,
   436  					Fields: []model.FieldValue{
   437  						model.FieldValue{
   438  							JSONPath: "$.spec.foo",
   439  							Value:    "{\"bar\": \"baz\"}",
   440  							Missing:  false,
   441  						},
   442  					},
   443  				}
   444  				require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
   445  				validateWatchedFieldInDatabase(ctx, t, wf)
   446  				// mark half of the watched fields deleted
   447  				if i%4 == 0 {
   448  					// garbage collect 1/4th of the watched fields
   449  					wf.Timestamp = timestamp.Add(time.Microsecond)
   450  
   451  					require.NoError(t, handle.DeleteWatchedField(ctx, wf), "error marking watched field deleted")
   452  					validateWatchedFieldDeletedInDatabase(ctx, t, wf)
   453  				} else if i%2 == 0 {
   454  					// recently deleted watched fields are not garbage collected.
   455  					wf.Timestamp = timestamp.Add(delta)
   456  
   457  					require.NoError(t, handle.DeleteWatchedField(ctx, wf), "error marking watched field deleted")
   458  					validateWatchedFieldDeletedInDatabase(ctx, t, wf)
   459  				}
   460  				wfs = append(wfs, wf)
   461  			}
   462  			validateWatchedFieldObjectsCount(ctx, t, len(wfs)/2)
   463  			return ctx
   464  		}).
   465  		Test("garbage collect the deleted watched fields", func(ctx f2.Context, t *testing.T) f2.Context {
   466  			actual, err := handle.GarbageCollectDeletedWatchedFieldObjects(ctx)
   467  			require.NoError(t, err, "could not garbage collect outdated watched field objects")
   468  			require.Equal(t, actual, len(wfs)/4, "did not garbace collect 1/4th of the watched field objects")
   469  			for i, wf := range wfs {
   470  				if i%4 == 0 {
   471  					validateWatchedFieldGarbageCollectedInDatabase(ctx, t, wf)
   472  				} else if i%2 == 0 {
   473  					validateWatchedFieldDeletedInDatabase(ctx, t, wf)
   474  				} else {
   475  					validateWatchedFieldInDatabase(ctx, t, wf)
   476  				}
   477  			}
   478  			return ctx
   479  		}).
   480  		Feature()
   481  
   482  	f2f.Test(t, feat)
   483  }
   484  

View as plain text