1 package server_test
2
3 import (
4 "context"
5 "testing"
6 "time"
7
8 server "edge-infra.dev/pkg/edge/psqlinjector"
9 "edge-infra.dev/pkg/f8n/kinform/model"
10 "edge-infra.dev/test/f2"
11
12 "cloud.google.com/go/pubsub"
13 "github.com/google/uuid"
14 "github.com/stretchr/testify/require"
15 )
16
17 func TestInjectorClusterHeartbeat(t *testing.T) {
18 var cfg = new(server.Config)
19 var bannerProjectIDs = []string{"foo", "bar", "baz"}
20
21
22 var start = time.Now().UTC().Truncate(time.Microsecond)
23 t.Logf("start time %v", start)
24
25
26 var clusterEdgeIDs map[string]uuid.UUID
27
28
29 var doneCtx, stopInjector = context.WithCancel(context.Background())
30 var runErrCh chan error
31
32 var feat = f2.NewFeature(t.Name()).
33 Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
34 return setupConfig(ctx, t, cfg)
35 }).
36 Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
37 return createKinformTopics(ctx, t, cfg, bannerProjectIDs...)
38 }).
39 Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
40 clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
41 return ctx
42 }).
43 Test("run", func(ctx f2.Context, t *testing.T) f2.Context {
44 pi, err := server.New(cfg)
45 require.NoError(t, err, "could not create injector")
46
47 runErrCh = make(chan error, 1)
48 go func() {
49 runErrCh <- pi.Run(doneCtx)
50 }()
51 return ctx
52 }).
53 Test("send heartbeat", func(ctx f2.Context, t *testing.T) f2.Context {
54 return sendHeartbeats(ctx, t, cfg, start, clusterEdgeIDs)
55 }).
56 Test("verify infra_status_updated_at times", func(ctx f2.Context, t *testing.T) f2.Context {
57
58 var timeout = time.After(5 * time.Second)
59 for {
60 select {
61 case <-time.After(50 * time.Millisecond):
62 case <-timeout:
63 t.Fatalf("did not update status times quick enough")
64 }
65
66 var updated = true
67 var updatedTimes = selectInfraStatusUpdatedAt(ctx, t, clusterEdgeIDs)
68 for _, updatedAt := range updatedTimes {
69 if !start.Equal(updatedAt) {
70 updated = false
71 }
72 }
73 if updated {
74 t.Logf("found updated times %v", updatedTimes)
75 return ctx
76 }
77 }
78 }).
79 Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
80 stopInjector()
81
82
83 require.NotNil(t, runErrCh, "psqlinjector never started")
84
85
86 var err = <-runErrCh
87 require.NoErrorf(t, err, "psqlinjector.Run returned unexpected error: %v", err)
88
89 return ctx
90 }).
91 Feature()
92
93 f2f.Test(t, feat)
94 }
95
96 func sendHeartbeats(ctx f2.Context, t *testing.T, cfg *server.Config, timestamp time.Time, clusterEdgeIDs map[string]uuid.UUID) f2.Context {
97
98 require.True(t, timestamp.Equal(timestamp.Truncate(time.Microsecond)), "postgres has a time precision of 1 microsecond")
99 require.Equal(t, timestamp.Location(), time.UTC, "postgres uses UTC")
100
101 var sessionID = uuid.New()
102 for p, ceid := range clusterEdgeIDs {
103 var client = createPubSubClient(ctx, t, p)
104
105 var msg = model.ClusterHeartbeat{
106 Cluster: ceid,
107 Timestamp: timestamp,
108 SessionID: sessionID,
109 }
110
111 var data, _ = msg.Data()
112
113 result := client.Topic(cfg.TopicID).Publish(ctx, &pubsub.Message{
114 Attributes: map[string]string{
115 model.AttrPayloadType: msg.PayloadType(),
116 model.AttrClusterUUID: ceid.String(),
117 },
118 Data: data,
119 })
120
121 _, err := result.Get(ctx)
122 require.NoError(t, err, "error publishing heartbeat")
123
124 client.Topic(cfg.TopicID).Flush()
125 }
126 return ctx
127 }
128
View as plain text