1 package server_test
2
3 import (
4 "testing"
5 "time"
6
7 server "edge-infra.dev/pkg/edge/psqlinjector"
8 "edge-infra.dev/pkg/f8n/kinform/model"
9 "edge-infra.dev/test/f2"
10 "edge-infra.dev/test/f2/x/postgres"
11
12 "github.com/stretchr/testify/require"
13 )
14
15 func TestTxWithoutRowLocking(t *testing.T) {
16 var bannerProjectIDs = []string{"foo"}
17 var handle server.DBHandle
18 var wf1 = model.WatchedField{
19 APIVersion: "foo/bar1baz2",
20 Kind: "NoisyKind",
21 Timestamp: time.Now().UTC().Truncate(time.Microsecond),
22 Fields: []model.FieldValue{
23 model.FieldValue{
24 JSONPath: "$.spec.foo",
25 Value: "{\"bar\": \"baz\"}",
26 Missing: false,
27 },
28 },
29 }
30 var wf2 = wf1
31
32
33 wf1.Name = "bill"
34 wf2.Name = "frank"
35
36 var feat = f2.NewFeature(t.Name()).
37 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
38 var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
39 wf1.Cluster = clusterEdgeIDs["foo"]
40 wf2.Cluster = clusterEdgeIDs["foo"]
41 return ctx
42 }).
43 Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
44 handle = server.DBHandle{
45 DB: postgres.FromContextT(ctx, t).DB(),
46 }
47 return ctx
48 }).
49 Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
50 require.NoError(t, handle.SetWatchedField(ctx, wf1), "error setting watched field one")
51 require.NoError(t, handle.SetWatchedField(ctx, wf2), "error setting watched field two")
52 validateWatchedFieldInDatabase(ctx, t, wf1)
53 validateWatchedFieldInDatabase(ctx, t, wf2)
54 return ctx
55 }).
56 Test("set watched fields for different objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
57 var now = time.Now().UTC().Truncate(time.Microsecond).Add(time.Microsecond)
58 wf1.Timestamp = now
59 wf2.Timestamp = now
60
61 tx1, err := handle.DB.BeginTx(ctx, nil)
62 require.NoError(t, err)
63 tx2, err := handle.DB.BeginTx(ctx, nil)
64 require.NoError(t, err)
65
66
67 id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
68 require.NoError(t, err, "could not upsert watched field object")
69
70
71 id2, err := server.TxSetWatchedFieldObject(tx2, wf2)
72 require.NoError(t, err, "could not upsert in second transaction")
73 require.NoError(t, server.TxSetWatchedFieldValues(tx2, wf2, id2), "could not write watched field values in second transaction")
74 require.NoError(t, tx2.Commit(), "could not commit second transaction")
75
76
77 require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not write watched field values in first transaction")
78 require.NoError(t, tx1.Commit(), "could not commit first transaction")
79
80
81 require.NotEqual(t, id1.String(), id2.String(), "the object_id should be different")
82 validateWatchedFieldInDatabase(ctx, t, wf1)
83 validateWatchedFieldInDatabase(ctx, t, wf2)
84 return ctx
85 }).
86 Test("set and delete watched fields for different objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
87 var now = time.Now().UTC().Truncate(time.Microsecond).Add(time.Microsecond)
88 wf1.Timestamp = now
89 wf2.Timestamp = now
90
91
92 tx1, err := handle.DB.BeginTx(ctx, nil)
93 require.NoError(t, err)
94 id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
95 require.NoError(t, err, "could not upsert watched field object")
96
97
98 require.NoError(t, handle.DeleteWatchedField(ctx, wf2), "could not upsert in second transaction")
99
100
101 require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not write watched field values in first transaction")
102 require.NoError(t, tx1.Commit(), "could not commit first transaction")
103
104
105 validateWatchedFieldInDatabase(ctx, t, wf1)
106 validateWatchedFieldDeletedInDatabase(ctx, t, wf2)
107 return ctx
108 }).
109 Feature()
110
111 f2f.Test(t, feat)
112 }
113
114 func TestTxRowLockingSequential(t *testing.T) {
115 var bannerProjectIDs = []string{"foo"}
116 var handle server.DBHandle
117 var wf = model.WatchedField{
118 APIVersion: "foo/bar1baz2",
119 Kind: "NoisyKind",
120 Name: "frank",
121 Timestamp: time.Now().UTC().Truncate(time.Microsecond),
122 Fields: []model.FieldValue{
123 model.FieldValue{
124 JSONPath: "$.spec.foo",
125 Value: "{\"bar\": \"baz\"}",
126 Missing: false,
127 },
128 },
129 }
130
131 var feat = f2.NewFeature(t.Name()).
132 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
133 var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
134 wf.Cluster = clusterEdgeIDs["foo"]
135 return ctx
136 }).
137 Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
138 handle = server.DBHandle{
139 DB: postgres.FromContextT(ctx, t).DB(),
140 }
141 return ctx
142 }).
143 Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
144 require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
145 validateWatchedFieldInDatabase(ctx, t, wf)
146 return ctx
147 }).
148 Test("set sequential watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
149 var now = time.Now().UTC().Truncate(time.Microsecond)
150 wf1, wf2 := wf, wf
151 wf1.Timestamp = now
152 wf2.Timestamp = now.Add(time.Microsecond)
153
154 tx1, err := handle.DB.BeginTx(ctx, nil)
155 require.NoError(t, err)
156 tx2, err := handle.DB.BeginTx(ctx, nil)
157 require.NoError(t, err)
158
159
160 id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
161 require.NoError(t, err, "could not upsert watched field object")
162
163 var tx2ch = make(chan error)
164 go func() {
165
166 id2, err := server.TxSetWatchedFieldObject(tx2, wf2)
167 tx2ch <- err
168 tx2ch <- server.TxSetWatchedFieldValues(tx2, wf2, id2)
169 tx2ch <- tx2.Commit()
170 close(tx2ch)
171 }()
172
173 select {
174 case <-time.After(time.Second):
175
176
177 require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
178 case err := <-tx2ch:
179 t.Logf("error returned by second transaction: %v", err)
180 t.Fatal("second transaction should be blocked until the first completes")
181 }
182
183 select {
184 case <-time.After(time.Second):
185
186
187 require.NoError(t, tx1.Commit(), "could not commit first transaction")
188 case err := <-tx2ch:
189 t.Logf("error returned by second transaction: %v", err)
190 t.Fatal("second transaction should be blocked until the first completes")
191 }
192
193 var timeout = time.After(5 * time.Second)
194 for i := 0; i < 3; i++ {
195 select {
196 case <-timeout:
197 t.Fatalf("second transaction did not complete quick enough")
198 case err := <-tx2ch:
199 require.NoError(t, err, "second transaction failed")
200 }
201 }
202
203
204 validateWatchedFieldInDatabase(ctx, t, wf2)
205 return ctx
206 }).
207 Test("set watched field object, then delete it in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
208 var now = time.Now().UTC().Truncate(time.Microsecond)
209 wf1, wf2 := wf, wf
210 wf1.Timestamp = now
211 wf2.Timestamp = now.Add(time.Microsecond)
212
213
214 tx1, err := handle.DB.BeginTx(ctx, nil)
215 require.NoError(t, err)
216 id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
217 require.NoError(t, err, "could not upsert watched field object")
218
219 var tx2ch = make(chan error)
220 go func() {
221
222 tx2ch <- handle.DeleteWatchedField(ctx, wf2)
223 close(tx2ch)
224 }()
225
226 select {
227 case <-time.After(time.Second):
228
229
230 require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
231 case err := <-tx2ch:
232 t.Logf("error returned by second transaction: %v", err)
233 t.Fatal("second transaction should be blocked until the first completes")
234 }
235
236 select {
237 case <-time.After(time.Second):
238
239
240 require.NoError(t, tx1.Commit(), "could not commit first transaction")
241 case err := <-tx2ch:
242 t.Logf("error returned by second transaction: %v", err)
243 t.Fatal("second transaction should be blocked until the first completes")
244 }
245
246 select {
247 case <-time.After(time.Second):
248 t.Fatalf("second transaction did not complete quick enough")
249 case err := <-tx2ch:
250 require.NoError(t, err, "second transaction failed")
251 }
252
253
254 validateWatchedFieldDeletedInDatabase(ctx, t, wf2)
255 return ctx
256 }).
257 Feature()
258
259 f2f.Test(t, feat)
260 }
261
262 func TestTxRowLockingOutdated(t *testing.T) {
263 var bannerProjectIDs = []string{"foo"}
264 var handle server.DBHandle
265 var wf = model.WatchedField{
266 APIVersion: "foo/bar1baz2",
267 Kind: "NoisyKind",
268 Name: "frank",
269 Timestamp: time.Now().UTC().Truncate(time.Microsecond),
270 Fields: []model.FieldValue{
271 model.FieldValue{
272 JSONPath: "$.spec.foo",
273 Value: "{\"bar\": \"baz\"}",
274 Missing: false,
275 },
276 },
277 }
278
279 var feat = f2.NewFeature(t.Name()).
280 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
281 var clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
282 wf.Cluster = clusterEdgeIDs["foo"]
283 return ctx
284 }).
285 Setup("handle", func(ctx f2.Context, t *testing.T) f2.Context {
286 handle = server.DBHandle{
287 DB: postgres.FromContextT(ctx, t).DB(),
288 }
289 return ctx
290 }).
291 Test("set watched field", func(ctx f2.Context, t *testing.T) f2.Context {
292 require.NoError(t, handle.SetWatchedField(ctx, wf), "error setting watched field")
293 validateWatchedFieldInDatabase(ctx, t, wf)
294 return ctx
295 }).
296 Test("set out-of-order watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
297 var now = time.Now().UTC().Truncate(time.Microsecond)
298 wf1, wf2 := wf, wf
299 wf1.Timestamp = now.Add(time.Microsecond)
300 wf2.Timestamp = now
301
302 tx1, err := handle.DB.BeginTx(ctx, nil)
303 require.NoError(t, err)
304 tx2, err := handle.DB.BeginTx(ctx, nil)
305 require.NoError(t, err)
306
307
308 id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
309 require.NoError(t, err, "could not upsert watched field object")
310
311 var tx2ch = make(chan error)
312 go func() {
313
314 id2, err := server.TxSetWatchedFieldObject(tx2, wf2)
315 require.Nil(t, id2, "second transaction should not return an object_id for the outdated watched field object")
316 tx2ch <- err
317 tx2ch <- tx2.Commit()
318 close(tx2ch)
319 }()
320
321 select {
322 case <-time.After(time.Second):
323
324
325 require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
326 case err := <-tx2ch:
327 t.Logf("error returned by second transaction: %v", err)
328 t.Fatal("second transaction should be blocked until the first completes")
329 }
330
331 select {
332 case <-time.After(time.Second):
333
334
335 require.NoError(t, tx1.Commit(), "could not commit first transaction")
336 case err := <-tx2ch:
337 t.Logf("error returned by second transaction: %v", err)
338 t.Fatal("second transaction should be blocked until the first completes")
339 }
340
341 var timeout = time.After(5 * time.Second)
342 select {
343 case <-timeout:
344 t.Fatal("second transaction took too long to complete")
345 case err := <-tx2ch:
346
347 require.Error(t, err, "the second transaction should return an error")
348 require.ErrorIs(t, err, server.ErrIgnoredMessage, "the second transaction should not upsert anything")
349 }
350 select {
351 case <-timeout:
352 t.Fatal("second transaction took too long to commit")
353 case err := <-tx2ch:
354 require.NoError(t, err, "second transaction commit failed")
355 }
356
357
358 validateWatchedFieldInDatabase(ctx, t, wf1)
359 return ctx
360 }).
361 Test("set and delete out-of-order watched field objects in simultaneous transactions", func(ctx f2.Context, t *testing.T) f2.Context {
362 var now = time.Now().UTC().Truncate(time.Microsecond)
363 wf1, wf2 := wf, wf
364 wf1.Timestamp = now.Add(time.Microsecond)
365 wf2.Timestamp = now
366
367
368 tx1, err := handle.DB.BeginTx(ctx, nil)
369 require.NoError(t, err)
370 id1, err := server.TxSetWatchedFieldObject(tx1, wf1)
371 require.NoError(t, err, "could not upsert watched field object")
372
373 var tx2ch = make(chan error)
374 go func() {
375
376 tx2ch <- handle.DeleteWatchedField(ctx, wf2)
377 close(tx2ch)
378 }()
379
380 select {
381 case <-time.After(time.Second):
382
383
384 require.NoError(t, server.TxSetWatchedFieldValues(tx1, wf1, id1), "could not upsert watched field values")
385 case err := <-tx2ch:
386 t.Logf("error returned by second transaction: %v", err)
387 t.Fatal("second transaction should be blocked until the first completes")
388 }
389
390 select {
391 case <-time.After(time.Second):
392
393
394 require.NoError(t, tx1.Commit(), "could not commit first transaction")
395 case err := <-tx2ch:
396 t.Logf("error returned by second transaction: %v", err)
397 t.Fatal("second transaction should be blocked until the first completes")
398 }
399
400 select {
401 case <-time.After(time.Second):
402 t.Fatal("second transaction took too long to complete")
403 case err := <-tx2ch:
404
405 require.ErrorIs(t, err, server.ErrIgnoredMessage, "the second transaction should return an ignored message error for outdated deletes")
406 }
407
408
409 validateWatchedFieldInDatabase(ctx, t, wf1)
410 return ctx
411 }).
412 Feature()
413
414 f2f.Test(t, feat)
415 }
416
View as plain text