...
1 package shoot
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "os/signal"
8
9 syscall "golang.org/x/sys/unix"
10
11 configLoader "github.com/joho/godotenv"
12
13 dsHTTP "edge-infra.dev/pkg/edge/datasync/http"
14 commonCfg "edge-infra.dev/pkg/edge/datasync/internal/config"
15 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
16 "edge-infra.dev/pkg/edge/datasync/shoot/config"
17 "edge-infra.dev/pkg/edge/datasync/shoot/handler"
18 shootHTTP "edge-infra.dev/pkg/edge/datasync/shoot/http"
19 publisher "edge-infra.dev/pkg/edge/datasync/shoot/pubsub"
20 "edge-infra.dev/pkg/edge/datasync/shoot/worker"
21 "edge-infra.dev/pkg/lib/fog"
22 )
23
24 func Main() {
25 log := fog.New().WithName("data-sync-connector")
26
27 if _, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST"); !exists {
28 if err := configLoader.Load(); err != nil {
29 log.Error(err, "failed to load configuration")
30 os.Exit(1)
31 }
32 }
33
34 shootConfig, err := config.NewConfig()
35 if err != nil {
36 log.Error(err, "invalid config for Shoot")
37 os.Exit(1)
38 }
39
40 internalConfig, err := commonCfg.NewConfig()
41 if err != nil {
42 log.Error(err, "invalid internal config")
43 os.Exit(1)
44 }
45
46 topicMappings, err := config.GetTopicMappings()
47 if err != nil {
48 log.Error(err, "failed to get topic mappings")
49 os.Exit(1)
50 }
51 cfg, err := config.GetKafkaClientConfig(shootConfig)
52 if err != nil {
53 log.Error(err, "fail to get valid kafka client config")
54 os.Exit(1)
55 }
56
57 pinger, err := kafkaclient.NewPinger(cfg.Brokers)
58 if err != nil {
59 log.Error(err, "could not create pinger client")
60 os.Exit(1)
61 }
62
63 err = pinger.Ping(context.Background())
64 if err != nil {
65 log.Error(err, "waiting for kafka broker to be available")
66 os.Exit(1)
67 }
68
69 workers := make([]worker.Worker, 0)
70 var metrics *worker.Metrics
71 for i, topicMapping := range topicMappings {
72 sourceTopic := topicMapping.Source
73 targetTopic := config.GetTargetTopic(sourceTopic)
74 pubsubPublisher := publisher.NewPubSub(targetTopic, shootConfig, log)
75 sourceCfg := *cfg
76 sourceCfg.Topic = sourceTopic
77 kafkaConsumer, err := kafkaclient.NewConsumer(&sourceCfg)
78 if err != nil {
79 log.Error(err, "error creating new NewConsumer")
80 os.Exit(1)
81 }
82
83 producer, err := kafkaclient.NewProducer(&sourceCfg)
84 if err != nil {
85 log.Error(err, "error creating new NewProducer")
86 os.Exit(1)
87 }
88 failureHandler := handler.NewFailureHandler(producer, sourceTopic)
89
90 if i == 0 {
91 metrics = worker.NewMetrics()
92 }
93
94 worker := worker.NewMainWorker(sourceTopic, kafkaConsumer, pubsubPublisher, failureHandler, metrics)
95
96 workers = append(workers, worker)
97 }
98
99 for _, worker := range workers {
100 go worker.Start()
101 }
102
103 shouldPrometheusRun := make(chan bool)
104 prometheusServer := dsHTTP.NewPrometheusServer(internalConfig.PrometheusPort, shouldPrometheusRun)
105 go prometheusServer.ListenAndServe()
106
107 shouldLivenessRun := make(chan bool)
108 livenessServer := shootHTTP.NewLivenessServer(pinger, shouldLivenessRun, internalConfig)
109 go livenessServer.ListenAndServe()
110
111 exit := make(chan bool)
112 handleShutdown(exit, shouldPrometheusRun, shouldLivenessRun, workers)
113
114 select {
115 case <-exit:
116 log.Info("OS signal notified to shutdown")
117 case <-shouldPrometheusRun:
118 log.Info("prometheus server notified to shutdown")
119 case <-shouldLivenessRun:
120 log.Info("liveness server notified to shutdown")
121 }
122 }
123
124 func handleShutdown(exit chan bool, prom chan bool, live chan bool, workers []worker.Worker) {
125 logger := fog.New()
126 sigs := make(chan os.Signal, 1)
127 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
128
129 go func() {
130 sig := <-sigs
131 logger.Info(fmt.Sprintf("got termination signal: (%+v). stopping all workers", sig))
132
133 for _, worker := range workers {
134 worker.Stop()
135 }
136
137 logger.Info("stopped all workers")
138 exit <- true
139 prom <- true
140 live <- true
141 logger.Info("exiting")
142 }()
143 }
144
View as plain text