package shoot import ( "context" "fmt" "os" "os/signal" syscall "golang.org/x/sys/unix" configLoader "github.com/joho/godotenv" dsHTTP "edge-infra.dev/pkg/edge/datasync/http" commonCfg "edge-infra.dev/pkg/edge/datasync/internal/config" "edge-infra.dev/pkg/edge/datasync/kafkaclient" "edge-infra.dev/pkg/edge/datasync/shoot/config" "edge-infra.dev/pkg/edge/datasync/shoot/handler" shootHTTP "edge-infra.dev/pkg/edge/datasync/shoot/http" publisher "edge-infra.dev/pkg/edge/datasync/shoot/pubsub" "edge-infra.dev/pkg/edge/datasync/shoot/worker" "edge-infra.dev/pkg/lib/fog" ) func Main() { log := fog.New().WithName("data-sync-connector") if _, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST"); !exists { if err := configLoader.Load(); err != nil { log.Error(err, "failed to load configuration") os.Exit(1) } } shootConfig, err := config.NewConfig() if err != nil { log.Error(err, "invalid config for Shoot") os.Exit(1) } internalConfig, err := commonCfg.NewConfig() if err != nil { log.Error(err, "invalid internal config") os.Exit(1) } topicMappings, err := config.GetTopicMappings() if err != nil { log.Error(err, "failed to get topic mappings") os.Exit(1) } cfg, err := config.GetKafkaClientConfig(shootConfig) if err != nil { log.Error(err, "fail to get valid kafka client config") os.Exit(1) } pinger, err := kafkaclient.NewPinger(cfg.Brokers) if err != nil { log.Error(err, "could not create pinger client") os.Exit(1) } err = pinger.Ping(context.Background()) if err != nil { log.Error(err, "waiting for kafka broker to be available") os.Exit(1) } workers := make([]worker.Worker, 0) var metrics *worker.Metrics for i, topicMapping := range topicMappings { sourceTopic := topicMapping.Source targetTopic := config.GetTargetTopic(sourceTopic) pubsubPublisher := publisher.NewPubSub(targetTopic, shootConfig, log) sourceCfg := *cfg sourceCfg.Topic = sourceTopic kafkaConsumer, err := kafkaclient.NewConsumer(&sourceCfg) if err != nil { log.Error(err, "error creating new NewConsumer") os.Exit(1) } producer, err := kafkaclient.NewProducer(&sourceCfg) if err != nil { log.Error(err, "error creating new NewProducer") os.Exit(1) } failureHandler := handler.NewFailureHandler(producer, sourceTopic) if i == 0 { metrics = worker.NewMetrics() } worker := worker.NewMainWorker(sourceTopic, kafkaConsumer, pubsubPublisher, failureHandler, metrics) workers = append(workers, worker) } for _, worker := range workers { go worker.Start() } shouldPrometheusRun := make(chan bool) prometheusServer := dsHTTP.NewPrometheusServer(internalConfig.PrometheusPort, shouldPrometheusRun) go prometheusServer.ListenAndServe() shouldLivenessRun := make(chan bool) livenessServer := shootHTTP.NewLivenessServer(pinger, shouldLivenessRun, internalConfig) go livenessServer.ListenAndServe() exit := make(chan bool) handleShutdown(exit, shouldPrometheusRun, shouldLivenessRun, workers) select { case <-exit: log.Info("OS signal notified to shutdown") case <-shouldPrometheusRun: log.Info("prometheus server notified to shutdown") case <-shouldLivenessRun: log.Info("liveness server notified to shutdown") } } func handleShutdown(exit chan bool, prom chan bool, live chan bool, workers []worker.Worker) { logger := fog.New() sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigs logger.Info(fmt.Sprintf("got termination signal: (%+v). stopping all workers", sig)) for _, worker := range workers { worker.Stop() } logger.Info("stopped all workers") exit <- true prom <- true live <- true logger.Info("exiting") }() }