...

Source file src/edge-infra.dev/pkg/edge/datasync/shoot/main.go

Documentation: edge-infra.dev/pkg/edge/datasync/shoot

     1  package shoot
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"os/signal"
     8  
     9  	syscall "golang.org/x/sys/unix"
    10  
    11  	configLoader "github.com/joho/godotenv"
    12  
    13  	dsHTTP "edge-infra.dev/pkg/edge/datasync/http"
    14  	commonCfg "edge-infra.dev/pkg/edge/datasync/internal/config"
    15  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
    16  	"edge-infra.dev/pkg/edge/datasync/shoot/config"
    17  	"edge-infra.dev/pkg/edge/datasync/shoot/handler"
    18  	shootHTTP "edge-infra.dev/pkg/edge/datasync/shoot/http"
    19  	publisher "edge-infra.dev/pkg/edge/datasync/shoot/pubsub"
    20  	"edge-infra.dev/pkg/edge/datasync/shoot/worker"
    21  	"edge-infra.dev/pkg/lib/fog"
    22  )
    23  
    24  func Main() {
    25  	log := fog.New().WithName("data-sync-connector")
    26  
    27  	if _, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST"); !exists {
    28  		if err := configLoader.Load(); err != nil {
    29  			log.Error(err, "failed to load configuration")
    30  			os.Exit(1)
    31  		}
    32  	}
    33  
    34  	shootConfig, err := config.NewConfig()
    35  	if err != nil {
    36  		log.Error(err, "invalid config for Shoot")
    37  		os.Exit(1)
    38  	}
    39  
    40  	internalConfig, err := commonCfg.NewConfig()
    41  	if err != nil {
    42  		log.Error(err, "invalid internal config")
    43  		os.Exit(1)
    44  	}
    45  
    46  	topicMappings, err := config.GetTopicMappings()
    47  	if err != nil {
    48  		log.Error(err, "failed to get topic mappings")
    49  		os.Exit(1)
    50  	}
    51  	cfg, err := config.GetKafkaClientConfig(shootConfig)
    52  	if err != nil {
    53  		log.Error(err, "fail to get valid kafka client config")
    54  		os.Exit(1)
    55  	}
    56  
    57  	pinger, err := kafkaclient.NewPinger(cfg.Brokers)
    58  	if err != nil {
    59  		log.Error(err, "could not create pinger client")
    60  		os.Exit(1)
    61  	}
    62  
    63  	err = pinger.Ping(context.Background())
    64  	if err != nil {
    65  		log.Error(err, "waiting for kafka broker to be available")
    66  		os.Exit(1)
    67  	}
    68  
    69  	workers := make([]worker.Worker, 0)
    70  	var metrics *worker.Metrics
    71  	for i, topicMapping := range topicMappings {
    72  		sourceTopic := topicMapping.Source
    73  		targetTopic := config.GetTargetTopic(sourceTopic)
    74  		pubsubPublisher := publisher.NewPubSub(targetTopic, shootConfig, log)
    75  		sourceCfg := *cfg
    76  		sourceCfg.Topic = sourceTopic
    77  		kafkaConsumer, err := kafkaclient.NewConsumer(&sourceCfg)
    78  		if err != nil {
    79  			log.Error(err, "error creating new NewConsumer")
    80  			os.Exit(1)
    81  		}
    82  
    83  		producer, err := kafkaclient.NewProducer(&sourceCfg)
    84  		if err != nil {
    85  			log.Error(err, "error creating new NewProducer")
    86  			os.Exit(1)
    87  		}
    88  		failureHandler := handler.NewFailureHandler(producer, sourceTopic)
    89  
    90  		if i == 0 {
    91  			metrics = worker.NewMetrics()
    92  		}
    93  
    94  		worker := worker.NewMainWorker(sourceTopic, kafkaConsumer, pubsubPublisher, failureHandler, metrics)
    95  
    96  		workers = append(workers, worker)
    97  	}
    98  
    99  	for _, worker := range workers {
   100  		go worker.Start()
   101  	}
   102  
   103  	shouldPrometheusRun := make(chan bool)
   104  	prometheusServer := dsHTTP.NewPrometheusServer(internalConfig.PrometheusPort, shouldPrometheusRun)
   105  	go prometheusServer.ListenAndServe()
   106  
   107  	shouldLivenessRun := make(chan bool)
   108  	livenessServer := shootHTTP.NewLivenessServer(pinger, shouldLivenessRun, internalConfig)
   109  	go livenessServer.ListenAndServe()
   110  
   111  	exit := make(chan bool)
   112  	handleShutdown(exit, shouldPrometheusRun, shouldLivenessRun, workers)
   113  
   114  	select {
   115  	case <-exit:
   116  		log.Info("OS signal notified to shutdown")
   117  	case <-shouldPrometheusRun:
   118  		log.Info("prometheus server notified to shutdown")
   119  	case <-shouldLivenessRun:
   120  		log.Info("liveness server notified to shutdown")
   121  	}
   122  }
   123  
   124  func handleShutdown(exit chan bool, prom chan bool, live chan bool, workers []worker.Worker) {
   125  	logger := fog.New()
   126  	sigs := make(chan os.Signal, 1)
   127  	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
   128  
   129  	go func() {
   130  		sig := <-sigs
   131  		logger.Info(fmt.Sprintf("got termination signal: (%+v). stopping all workers", sig))
   132  
   133  		for _, worker := range workers {
   134  			worker.Stop()
   135  		}
   136  
   137  		logger.Info("stopped all workers")
   138  		exit <- true
   139  		prom <- true
   140  		live <- true
   141  		logger.Info("exiting")
   142  	}()
   143  }
   144  

View as plain text