package chirp import ( "flag" "os" "runtime" "time" configLoader "github.com/joho/godotenv" "edge-infra.dev/pkg/edge/datasync/chirp/provider" "edge-infra.dev/pkg/edge/datasync/chirp/sender" grpcServer "edge-infra.dev/pkg/edge/datasync/chirp/server/grpc" chirpHTTP "edge-infra.dev/pkg/edge/datasync/chirp/server/http" "edge-infra.dev/pkg/edge/datasync/chirp/worker" dsHTTP "edge-infra.dev/pkg/edge/datasync/http" "edge-infra.dev/pkg/edge/datasync/internal/config" "edge-infra.dev/pkg/edge/datasync/internal/metric" "edge-infra.dev/pkg/lib/fog" ) func Main() { runtime.GOMAXPROCS(4) logger := fog.New().WithName("data-sync-messaging") if _, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST"); !exists { if err := configLoader.Load(); err != nil { logger.Error(err, "failed to load configuration") os.Exit(1) } } flag.Parse() var err = config.ValidateConfiguration() if err != nil { logger.Error(err, "failed to start application. missing environment configuration") os.Exit(1) } chirpConfig, err := config.NewConfig() if err != nil { logger.Error(err, "invalid config for Chirp") os.Exit(1) } cfg, err := config.GetKafkaClientConfig(chirpConfig) if err != nil { logger.Error(err, "fail to get valid kafka client config") os.Exit(1) } shouldMsgRun := make(chan bool) httpMsgServer, filePersister := chirpHTTP.NewMsgServer(shouldMsgRun, chirpConfig) go httpMsgServer.ListenAndServe() shouldLivenessRun := make(chan bool) livenessServer := chirpHTTP.NewLivenessServer(shouldLivenessRun, chirpConfig) go livenessServer.ListenAndServe() shouldReadinessRun := make(chan bool) readinessServer := chirpHTTP.NewReadinessServer(shouldReadinessRun, chirpConfig) go readinessServer.ListenAndServe() shouldPrometheusRun := make(chan bool) prometheusServer := dsHTTP.NewPrometheusServer(chirpConfig.PrometheusPort, shouldPrometheusRun) go prometheusServer.ListenAndServe() time.Sleep(3 * time.Second) partitions := chirpConfig.Partition senderFactory := sender.NewSenderFactory(filePersister) messageSender, err := senderFactory.GetInstance(cfg, chirpConfig) if err != nil { logger.Error(err, "fail to get sender") os.Exit(1) } for partition := 0; partition < partitions; partition++ { messageProvider := provider.NewFileSystemProvider(partition, chirpConfig) worker := worker.NewMessageWorker(partition, messageProvider, messageSender, chirpConfig) go worker.DoWork() } outboxMetric := metric.NewOutboxMessagesMetric(chirpConfig) go outboxMetric.StartRecording() grpcServer := grpcServer.NewServer(filePersister, chirpConfig) grpcServer.ListenAndServe() msgRun := <-shouldMsgRun livenessRun := <-shouldLivenessRun readinessRun := <-shouldReadinessRun prometheusRun := <-shouldPrometheusRun logger.Info("should server run", "msgRun", msgRun, "livenessRun", livenessRun, "readinessRun", readinessRun, "prometheusRun", prometheusRun) }