1 package server_test
2
3 import (
4 "testing"
5 "time"
6
7 server "edge-infra.dev/pkg/edge/psqlinjector"
8 "edge-infra.dev/pkg/f8n/kinform/model"
9 "edge-infra.dev/test/f2"
10 "edge-infra.dev/test/f2/x/postgres"
11
12 "github.com/google/uuid"
13 "github.com/stretchr/testify/require"
14 )
15
16 func validateWatchedFieldInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) {
17 var db = postgres.FromContextT(ctx, t).DB()
18 const selectWatchedFieldObjectID = "SELECT object_id, watched_at FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5) AND deleted IS FALSE"
19 const selectWatchedFieldValues = "SELECT jsonpath, value, missing FROM watched_field_values WHERE object_id = $1"
20
21 var timestamp time.Time
22 var objectID uuid.UUID
23 var row = db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace)
24 require.NoError(t, row.Scan(&objectID, ×tamp), "error getting watched_field_objects object_id and watched_at")
25 require.True(t, timestamp.Equal(wf.Timestamp), "timestamp does not match watched_field_objects.watched_at")
26
27 var m = make(map[string]model.FieldValue)
28 var rows, err = db.QueryContext(ctx, selectWatchedFieldValues, objectID.String())
29 require.NoError(t, err, "error selecting watched field values")
30 for rows.Next() {
31 var fv model.FieldValue
32 require.NoError(t, rows.Scan(&fv.JSONPath, &fv.Value, &fv.Missing), "error scanning watched_field_values")
33 m[fv.JSONPath] = fv
34 }
35 require.NoError(t, rows.Err())
36 require.Equal(t, len(m), len(wf.Fields))
37
38 for _, fv := range wf.Fields {
39 var x = m[fv.JSONPath]
40 require.Equal(t, x.JSONPath, fv.JSONPath)
41 require.Equal(t, x.Value, fv.Value)
42 require.Equal(t, x.Missing, fv.Missing)
43 }
44 }
45
46 func validateWatchedFieldDeletedInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) {
47 var db = postgres.FromContextT(ctx, t).DB()
48 const selectWatchedFieldObjectID = "SELECT watched_at, deleted FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5)"
49
50 var (
51 timestamp time.Time
52 deleted bool
53 )
54 var row = db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace)
55 require.NoError(t, row.Scan(×tamp, &deleted), "error getting watched_field_objects object_id and watched_at")
56 require.True(t, timestamp.Equal(wf.Timestamp), "timestamp does not match watched_field_objects.watched_at")
57 require.True(t, deleted, "the object is not marked deleted")
58 }
59
60 func validateWatchedFieldGarbageCollectedInDatabase(ctx f2.Context, t *testing.T, wf model.WatchedField) {
61 var db = postgres.FromContextT(ctx, t).DB()
62 const selectWatchedFieldObjectID = "SELECT COUNT(*) FROM watched_field_objects WHERE (cluster_edge_id, api_version, kind, name, namespace) = ($1, $2, $3, $4, $5)"
63 var count int
64 require.NoError(t, db.QueryRowContext(ctx, selectWatchedFieldObjectID, wf.Cluster.String(), wf.APIVersion, wf.Kind, wf.Name, wf.Namespace).Scan(&count))
65 require.Equal(t, 0, count, "watched field not gargage collected")
66 }
67
68 func validateWatchedFieldObjectsCount(ctx f2.Context, t *testing.T, expected int) {
69 var actual int
70 var handle = server.DBHandle{
71 DB: postgres.FromContextT(ctx, t).DB(),
72 }
73
74 const stmtCountObjects = `SELECT COUNT(*) FROM watched_field_objects WHERE deleted IS FALSE`
75 require.NoError(t, handle.DB.QueryRow(stmtCountObjects).Scan(&actual), "error counting watched_field_objects table")
76 require.Equal(t, expected, actual, "the amount of watched field objects is incorrect")
77 }
78
79 func validateWatchedFieldValuesCount(ctx f2.Context, t *testing.T, expected int) {
80 var actual int
81 var handle = server.DBHandle{
82 DB: postgres.FromContextT(ctx, t).DB(),
83 }
84
85 const stmtCountValues = `SELECT COUNT(*) FROM watched_field_values WHERE object_id IN (SELECT object_id FROM watched_field_objects WHERE deleted IS FALSE)`
86 require.NoError(t, handle.DB.QueryRow(stmtCountValues).Scan(&actual), "error counting watched_field_values table")
87 require.Equal(t, expected, actual, "the amount of watched field values is incorrect")
88 }
89
90 func TestSQLSetClusterHeartbeatTime(t *testing.T) {
91 var bannerProjectIDs = []string{"foo", "bar", "baz"}
92
93
94 var clusterEdgeIDs map[string]uuid.UUID
95
96 var feat = f2.NewFeature(t.Name()).
97 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
98 clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
99 return ctx
100 }).
101 Test("set cluster heartbeat time", func(ctx f2.Context, t *testing.T) f2.Context {
102 var handle = server.DBHandle{
103 DB: postgres.FromContextT(ctx, t).DB(),
104 }
105
106 var statusTimes = make(map[string]time.Time)
107 for p, ceid := range clusterEdgeIDs {
108 statusTimes[p] = time.Now().UTC().Truncate(time.Microsecond)
109
110 err := handle.SetClusterHeartbeatTime(ctx, statusTimes[p], ceid)
111 require.NoError(t, err, "error setting cluster heartbeat time")
112 }
113
114
115 for p, updatedAt := range selectInfraStatusUpdatedAt(ctx, t, clusterEdgeIDs) {
116 require.Equal(t, updatedAt, statusTimes[p], "infra_status_updated_at not equal to set value")
117 }
118
119 return ctx
120 }).
121 Feature()
122
123 f2f.Test(t, feat)
124 }
125
126 func TestSQLGetProjectIDs(t *testing.T) {
127 var bannerProjectIDs = []string{"foo", "bar", "baz"}
128 var handle server.DBHandle
129
130 var feat = f2.NewFeature(t.Name()).
131 Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
132 handle = server.DBHandle{
133 DB: postgres.FromContextT(ctx, t).DB(),
134 }
135 return ctx
136 }).
137 Test("empty", func(ctx f2.Context, t *testing.T) f2.Context {
138 foundProjectIDs, err := handle.GetBannerProjectIDs(ctx)
139 require.NoError(t, err, "error getting project IDs from DBHandle")
140 require.Empty(t, foundProjectIDs, "got project ids before db was populated")
141 return ctx
142 }).
143 Test("GetBannerProjectIDs", func(ctx f2.Context, t *testing.T) f2.Context {
144 populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
145
146 foundProjectIDs, err := handle.GetBannerProjectIDs(ctx)
147 require.NoError(t, err, "error getting project IDs from DBHandle")
148 require.ElementsMatch(t, bannerProjectIDs, foundProjectIDs, "got unknown or missing project ids")
149 return ctx
150 }).
151 Feature()
152
153 f2f.Test(t, feat)
154 }
155
156 func TestSQLSetWatchedField(t *testing.T) {
157 var bannerProjectIDs = []string{"foo", "bar", "baz"}
158 var handle server.DBHandle
159 var clusterEdgeIDs map[string]uuid.UUID
160 var wf = model.WatchedField{
161 APIVersion: "hello/world",
162 Kind: "MyKind",
163 Name: "asdf",
164 Timestamp: time.Now().UTC().Truncate(time.Microsecond),
165 Fields: []model.FieldValue{
166 model.FieldValue{
167 JSONPath: "$.spec.foo",
168 Value: "{\"bar\": \"baz\"}",
169 Missing: false,
170 },
171 },
172 }
173
174 var feat = f2.NewFeature(t.Name()).
175 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
176 clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
177 wf.Cluster = clusterEdgeIDs["foo"]
178 return ctx
179 }).
180 Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
181 handle = server.DBHandle{
182 DB: postgres.FromContextT(ctx, t).DB(),
183 }
184 return ctx
185 }).
186 Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
187 require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
188 validateWatchedFieldInDatabase(ctx, t, wf)
189 return ctx
190 }).
191 Test("set more watched fields", func(ctx f2.Context, t *testing.T) f2.Context {
192 wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond)
193 wf.Fields = append(wf.Fields, model.FieldValue{
194 JSONPath: "$.foo.bar",
195 Value: "\"asdf\"",
196 Missing: false,
197 })
198 require.NoError(t, handle.SetWatchedField(ctx, wf))
199 validateWatchedFieldInDatabase(ctx, t, wf)
200 return ctx
201 }).
202 Test("set watched fields but cause one to be deleted", func(ctx f2.Context, t *testing.T) f2.Context {
203 var existing = wf.Fields[0]
204 wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond)
205 wf.Fields = []model.FieldValue{
206 existing,
207 model.FieldValue{
208 JSONPath: "$.another.one",
209 Missing: true,
210 },
211 }
212
213 require.NoError(t, handle.SetWatchedField(ctx, wf))
214 validateWatchedFieldInDatabase(ctx, t, wf)
215 return ctx
216 }).
217 Test("set with only the timestamp updated", func(ctx f2.Context, t *testing.T) f2.Context {
218 wf.Timestamp = time.Now().UTC().Truncate(time.Microsecond)
219 require.NoError(t, handle.SetWatchedField(ctx, wf))
220 validateWatchedFieldInDatabase(ctx, t, wf)
221 return ctx
222 }).
223 Feature()
224
225 f2f.Test(t, feat)
226 }
227
228 func TestSQLDeleteWatchedField(t *testing.T) {
229 var bannerProjectIDs = []string{"foo"}
230 var handle server.DBHandle
231 var clusterEdgeIDs map[string]uuid.UUID
232 var now = time.Now().UTC().Truncate(time.Microsecond)
233 var wf = model.WatchedField{
234 APIVersion: "hello/world",
235 Kind: "MyKind",
236 Name: "asdf",
237 Timestamp: now,
238 Fields: []model.FieldValue{
239 model.FieldValue{
240 JSONPath: "$.spec.foo",
241 Value: "{\"bar\": \"baz\"}",
242 Missing: false,
243 },
244 },
245 }
246 var deleted = model.WatchedField{
247 APIVersion: "hello/world",
248 Kind: "MyKind",
249 Name: "deleted",
250 Timestamp: now,
251 Fields: []model.FieldValue{
252 model.FieldValue{
253 JSONPath: "$.spec.foo",
254 Value: "{\"bar\": \"baz\"}",
255 Missing: false,
256 },
257 },
258 }
259
260 var feat = f2.NewFeature(t.Name()).
261 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
262 clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
263 wf.Cluster = clusterEdgeIDs["foo"]
264 deleted.Cluster = clusterEdgeIDs["foo"]
265 return ctx
266 }).
267 Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
268 handle = server.DBHandle{
269 DB: postgres.FromContextT(ctx, t).DB(),
270 }
271 return ctx
272 }).
273 Test("set watched fields", func(ctx f2.Context, t *testing.T) f2.Context {
274 require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
275 require.NoError(t, handle.SetWatchedField(ctx, deleted), "error setting watched field")
276 validateWatchedFieldInDatabase(ctx, t, wf)
277 validateWatchedFieldInDatabase(ctx, t, deleted)
278
279
280 validateWatchedFieldObjectsCount(ctx, t, 2)
281 validateWatchedFieldValuesCount(ctx, t, 2)
282
283 return ctx
284 }).
285 Test("delete watched field", func(ctx f2.Context, t *testing.T) f2.Context {
286 deleted.Timestamp = now.Add(time.Microsecond)
287 require.NoError(t, handle.DeleteWatchedField(ctx, deleted))
288
289 validateWatchedFieldDeletedInDatabase(ctx, t, deleted)
290 validateWatchedFieldObjectsCount(ctx, t, 1)
291 validateWatchedFieldValuesCount(ctx, t, 1)
292
293 return ctx
294 }).
295 Feature()
296
297 f2f.Test(t, feat)
298 }
299
300 func TestDeleteOutdatedWatchedFieldObjects(t *testing.T) {
301 var bannerProjectIDs = []string{"foo", "bar", "baz", "neo", "zed"}
302 var handle server.DBHandle
303 var clusterEdgeIDs map[string]uuid.UUID
304 var wfs []model.WatchedField
305 var now = time.Now().UTC().Truncate(time.Microsecond)
306 var feat = f2.NewFeature(t.Name()).
307 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
308 clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
309 handle = server.DBHandle{
310 DB: postgres.FromContextT(ctx, t).DB(),
311 }
312 return ctx
313 }).
314 Test("create and set watched field objects", func(ctx f2.Context, t *testing.T) f2.Context {
315 var apiVersions = []string{
316 "hello.world/v1",
317 "hello.world/v1beta1",
318 "foo.bar/v2",
319 "v1",
320 }
321 var kinds = []string{
322 "Dog",
323 "Cat",
324 "Fish",
325 "Bird",
326 "Zebra",
327 "AntEater",
328 }
329
330 var timestamps = []time.Time{
331 now.Add(-time.Microsecond),
332 now.Add(time.Microsecond),
333 }
334 for i := 0; i < 42; i++ {
335 var wf = model.WatchedField{
336 Cluster: clusterEdgeIDs[bannerProjectIDs[i%len(clusterEdgeIDs)]],
337 APIVersion: apiVersions[i%len(apiVersions)],
338 Kind: kinds[i%len(kinds)],
339 Name: uuid.New().String(),
340 Timestamp: timestamps[i%2],
341 Fields: []model.FieldValue{
342 model.FieldValue{
343 JSONPath: "$.spec.foo",
344 Value: "{\"bar\": \"baz\"}",
345 Missing: false,
346 },
347 },
348 }
349 require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
350 validateWatchedFieldInDatabase(ctx, t, wf)
351 wfs = append(wfs, wf)
352 }
353 validateWatchedFieldObjectsCount(ctx, t, len(wfs))
354 return ctx
355 }).
356 Test("handle a ScrapeMessage that does not mark any objects deleted", func(ctx f2.Context, t *testing.T) f2.Context {
357 for _, cid := range clusterEdgeIDs {
358 var sm = &model.ScrapeMessage{
359 Cluster: cid,
360 StartTime: now.Add(-time.Minute),
361 }
362 require.NoError(t, handle.DeleteOutdatedWatchedFieldObjects(ctx, *sm))
363 }
364 validateWatchedFieldObjectsCount(ctx, t, len(wfs))
365 return ctx
366 }).
367 Test("handle a ScrapeMessage for each cluster to delete outdated objects", func(ctx f2.Context, t *testing.T) f2.Context {
368 for _, cid := range clusterEdgeIDs {
369 var sm = &model.ScrapeMessage{
370 Cluster: cid,
371 StartTime: now,
372 }
373 require.NoError(t, handle.DeleteOutdatedWatchedFieldObjects(ctx, *sm))
374 }
375 return ctx
376 }).
377 Test("verify outdated objects were deleted", func(ctx f2.Context, t *testing.T) f2.Context {
378 var expectedCount int
379 for _, wf := range wfs {
380 if wf.Timestamp.Before(now) {
381
382
383 wf.Timestamp = now
384 validateWatchedFieldDeletedInDatabase(ctx, t, wf)
385 } else {
386
387 validateWatchedFieldInDatabase(ctx, t, wf)
388 expectedCount++
389 }
390 }
391 validateWatchedFieldObjectsCount(ctx, t, expectedCount)
392 return ctx
393 }).
394 Feature()
395
396 f2f.Test(t, feat)
397 }
398
399 func TestGarbageCollectDeletedWatchedFieldObjects(t *testing.T) {
400 var bannerProjectIDs = []string{"foo", "bar", "baz", "neo", "zed"}
401 var handle server.DBHandle
402 var clusterEdgeIDs map[string]uuid.UUID
403 var wfs []model.WatchedField
404 var feat = f2.NewFeature(t.Name()).
405 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
406 clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
407 handle = server.DBHandle{
408 DB: postgres.FromContextT(ctx, t).DB(),
409 }
410 return ctx
411 }).
412 Test("create and set watched field objects", func(ctx f2.Context, t *testing.T) f2.Context {
413 var apiVersions = []string{
414 "hello.world/v1",
415 "hello.world/v1beta1",
416 "foo.bar/v2",
417 "v1",
418 }
419 var kinds = []string{
420 "Dog",
421 "Cat",
422 "Fish",
423 "Bird",
424 "Zebra",
425 "AntEater",
426 }
427 const delta = time.Hour + 3*time.Minute
428 var timestamp = time.Now().UTC().Truncate(time.Microsecond).Add(-delta)
429 for i := 0; i < 40; i++ {
430 var wf = model.WatchedField{
431 Cluster: clusterEdgeIDs[bannerProjectIDs[i%len(clusterEdgeIDs)]],
432 APIVersion: apiVersions[i%len(apiVersions)],
433 Kind: kinds[i%len(kinds)],
434 Name: uuid.New().String(),
435 Timestamp: timestamp,
436 Fields: []model.FieldValue{
437 model.FieldValue{
438 JSONPath: "$.spec.foo",
439 Value: "{\"bar\": \"baz\"}",
440 Missing: false,
441 },
442 },
443 }
444 require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
445 validateWatchedFieldInDatabase(ctx, t, wf)
446
447 if i%4 == 0 {
448
449 wf.Timestamp = timestamp.Add(time.Microsecond)
450
451 require.NoError(t, handle.DeleteWatchedField(ctx, wf), "error marking watched field deleted")
452 validateWatchedFieldDeletedInDatabase(ctx, t, wf)
453 } else if i%2 == 0 {
454
455 wf.Timestamp = timestamp.Add(delta)
456
457 require.NoError(t, handle.DeleteWatchedField(ctx, wf), "error marking watched field deleted")
458 validateWatchedFieldDeletedInDatabase(ctx, t, wf)
459 }
460 wfs = append(wfs, wf)
461 }
462 validateWatchedFieldObjectsCount(ctx, t, len(wfs)/2)
463 return ctx
464 }).
465 Test("garbage collect the deleted watched fields", func(ctx f2.Context, t *testing.T) f2.Context {
466 actual, err := handle.GarbageCollectDeletedWatchedFieldObjects(ctx)
467 require.NoError(t, err, "could not garbage collect outdated watched field objects")
468 require.Equal(t, actual, len(wfs)/4, "did not garbace collect 1/4th of the watched field objects")
469 for i, wf := range wfs {
470 if i%4 == 0 {
471 validateWatchedFieldGarbageCollectedInDatabase(ctx, t, wf)
472 } else if i%2 == 0 {
473 validateWatchedFieldDeletedInDatabase(ctx, t, wf)
474 } else {
475 validateWatchedFieldInDatabase(ctx, t, wf)
476 }
477 }
478 return ctx
479 }).
480 Feature()
481
482 f2f.Test(t, feat)
483 }
484
View as plain text