1 package lighthouse
2
3 import (
4 "context"
5 "encoding/json"
6 "time"
7
8 "sigs.k8s.io/controller-runtime/pkg/client/fake"
9
10 pubSub "cloud.google.com/go/pubsub"
11
12 "edge-infra.dev/pkg/edge/lighthouse/config"
13 "edge-infra.dev/pkg/edge/lighthouse/event"
14 "edge-infra.dev/pkg/edge/lighthouse/testutils"
15 "edge-infra.dev/pkg/lib/gcp/pubsub"
16 "edge-infra.dev/pkg/lib/logging"
17 chariotTestutils "edge-infra.dev/test/framework/gcp/pubsub"
18 )
19
20 const (
21 clusterChannel = "cluster_events"
22 bannerChannel = "banner_events"
23 )
24
25 var (
26 cl = fake.NewClientBuilder().Build()
27 )
28
29 func (s *Suite) TestNewWatchTower() {
30 lstner := NewWatchTower()
31 s.NotNil(lstner)
32 }
33
34 func (s *Suite) TestSetLivelinessProbe() {
35 lstner := NewWatchTower()
36 s.NotNil(lstner)
37 lstner.SetLivelinessProbe(time.Minute * 15)
38 s.Equal(lstner.GetLivelinessProbe(), time.Minute*15)
39 }
40
41 func (s *Suite) TestSetMaxConcurrent() {
42 lstner := NewWatchTower()
43 s.NotNil(lstner)
44 s.Equal(lstner.GetMaxConcurrent(), int64(DefaultMaxConcurrent))
45 lstner.SetMaxConcurrent(120)
46 s.Equal(lstner.GetMaxConcurrent(), int64(120))
47 }
48
49 func (s *Suite) TestSetProjectID() {
50 lstner := NewWatchTower()
51 s.NotNil(lstner)
52 lstner.SetProjectID(s.ProjectID)
53 s.Equal(lstner.GetProjectID(), s.ProjectID)
54 }
55
56 func (s *Suite) TestSetTopicID() {
57 lstner := NewWatchTower()
58 s.NotNil(lstner)
59 lstner.SetTopicID(s.TopicID)
60 s.Equal(lstner.GetTopicID(), s.TopicID)
61 }
62
63 func (s *Suite) TestRegisterChannel() {
64 dbCfg := s.SeededPostgres.EdgePostgres()
65 customDBCfg := testutils.DialMe{DB: dbCfg}
66 lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
67 s.NoError(lstner.RegisterChannel(clusterChannel))
68 }
69
70 func (s *Suite) TestUnregisterChannel() {
71 dbCfg := s.SeededPostgres.EdgePostgres()
72 customDBCfg := testutils.DialMe{DB: dbCfg}
73 lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
74 s.NoError(lstner.RegisterChannel(clusterChannel))
75 s.True(lstner.ChannelExists(clusterChannel))
76 s.NoError(lstner.UnregisterChannel(clusterChannel))
77 s.False(lstner.ChannelExists(clusterChannel))
78 }
79
80 func (s *Suite) TestUnregisterAllChannels() {
81 dbCfg := s.SeededPostgres.EdgePostgres()
82 customDBCfg := testutils.DialMe{DB: dbCfg}
83 lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
84 s.NoError(lstner.RegisterChannel(clusterChannel))
85 s.NoError(lstner.RegisterChannel(bannerChannel))
86 s.True(lstner.ChannelExists(clusterChannel))
87 s.True(lstner.ChannelExists(bannerChannel))
88 s.NoError(lstner.UnregisterAllChannels())
89 s.False(lstner.ChannelExists(clusterChannel))
90 s.False(lstner.ChannelExists(bannerChannel))
91 }
92
93 func (s *Suite) TestStream() {
94 ctx := context.Background()
95 insertOperation := "INSERT"
96 dbCfg := s.SeededPostgres.EdgePostgres()
97 customDBCfg := testutils.DialMe{DB: dbCfg}
98 _, err := config.Init(context.Background(), cl)
99 s.NoError(err)
100 lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
101 err = lstner.RegisterChannel(clusterChannel)
102 s.NoError(err)
103 s.True(lstner.ChannelExists(clusterChannel))
104 mockPubsub, err := chariotTestutils.NewMockPubsubServer()
105 s.NoError(err)
106 pubSubService, err := pubsub.NewWithOptions(context.Background(), s.ProjectID, mockPubsub)
107 s.NoError(err)
108 lstner.AddPubsubClient(&pubSubService)
109 pubsubTopic, err := chariotTestutils.CreateMockTopic(context.Background(), s.ProjectID, s.TopicID, mockPubsub)
110 s.NoError(err)
111 lstner.SetProjectID(s.ProjectID)
112 lstner.SetTopicID(pubsubTopic.ID())
113 go lstner.Stream(context.Background(), logging.NewLogger().WithName("lighthouse"))
114 _, err = s.DB.Exec("INSERT INTO clusters(cluster_name, project_id, registered, active, cluster_edge_id, banner_edge_id, bsl_site_id, location) VALUES(gen_random_uuid(), gen_random_uuid(), true, true, gen_random_uuid(), '3396a52c-6a22-4049-9593-5a63b596a100', 'bsl_site_id', 'us-east-1')")
115 s.NoError(err)
116 go func() {
117 for {
118 subscription, err := pubSubService.CreateSubscription(ctx, pubsubTopic.ID(), s.SubscriptionID, s.AckDeadline, "")
119 s.NoError(err)
120 err = subscription.Receive(ctx, func(_ context.Context, msg *pubSub.Message) {
121 evt := event.New()
122 err = json.Unmarshal(msg.Data, &evt)
123 s.NoError(err)
124 s.Equal(evt.Channel, clusterChannel)
125 s.Equal(evt.Operation, insertOperation)
126 s.NotNil(evt.ClusterInfraClusterEdgeID)
127 msg.Ack()
128 })
129 s.NoError(err)
130 }
131 }()
132 }
133
View as plain text