...
1 package lighthouse
2
3 import (
4 "context"
5
6 "edge-infra.dev/pkg/edge/lighthouse/config"
7 "edge-infra.dev/pkg/lib/gcp/pubsub"
8 "edge-infra.dev/pkg/lib/logging"
9 )
10
11
12 func Run() error {
13 log := logging.NewLogger().WithName("lighthouse")
14 ctx := context.Background()
15
16 syscfg, err := config.Init(ctx, nil)
17 if err != nil {
18 log.Error(err, "failed to init system configuration")
19 return err
20 }
21
22 manager, err := pubsub.NewWithOptions(ctx, syscfg.ProjectID)
23 if err != nil {
24 log.Error(err, "failed to setup pubsub client")
25 return err
26 }
27
28 lstner := NewWatchTower().
29 SetProjectID(syscfg.ProjectID).
30 SetTopicID(syscfg.TopicID).
31 AddPubsubClient(manager).
32 AddListener(syscfg.DatabaseCfg, syscfg.DatabaseCfg.ConnectionString(true), 0, 0, log)
33
34 watch, err := WatchCfg(ctx, syscfg, lstner, log)
35 if err != nil {
36 log.Error(err, "failed to watch lighthouse configmap")
37 return err
38 }
39 defer watch.Stop()
40
41 log.Info("Lighthouse has started and is listening to Postgres notifications and events...")
42 lstner.Stream(ctx, log)
43 return nil
44 }
45
View as plain text