...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/main.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp

     1  package chirp
     2  
     3  import (
     4  	"flag"
     5  	"os"
     6  	"runtime"
     7  	"time"
     8  
     9  	configLoader "github.com/joho/godotenv"
    10  
    11  	"edge-infra.dev/pkg/edge/datasync/chirp/provider"
    12  	"edge-infra.dev/pkg/edge/datasync/chirp/sender"
    13  	grpcServer "edge-infra.dev/pkg/edge/datasync/chirp/server/grpc"
    14  	chirpHTTP "edge-infra.dev/pkg/edge/datasync/chirp/server/http"
    15  	"edge-infra.dev/pkg/edge/datasync/chirp/worker"
    16  	dsHTTP "edge-infra.dev/pkg/edge/datasync/http"
    17  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    18  	"edge-infra.dev/pkg/edge/datasync/internal/metric"
    19  	"edge-infra.dev/pkg/lib/fog"
    20  )
    21  
    22  func Main() {
    23  	runtime.GOMAXPROCS(4)
    24  	logger := fog.New().WithName("data-sync-messaging")
    25  
    26  	if _, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST"); !exists {
    27  		if err := configLoader.Load(); err != nil {
    28  			logger.Error(err, "failed to load configuration")
    29  			os.Exit(1)
    30  		}
    31  	}
    32  
    33  	flag.Parse()
    34  	var err = config.ValidateConfiguration()
    35  	if err != nil {
    36  		logger.Error(err, "failed to start application. missing environment configuration")
    37  		os.Exit(1)
    38  	}
    39  	chirpConfig, err := config.NewConfig()
    40  	if err != nil {
    41  		logger.Error(err, "invalid config for Chirp")
    42  		os.Exit(1)
    43  	}
    44  
    45  	cfg, err := config.GetKafkaClientConfig(chirpConfig)
    46  	if err != nil {
    47  		logger.Error(err, "fail to get valid kafka client config")
    48  		os.Exit(1)
    49  	}
    50  
    51  	shouldMsgRun := make(chan bool)
    52  	httpMsgServer, filePersister := chirpHTTP.NewMsgServer(shouldMsgRun, chirpConfig)
    53  	go httpMsgServer.ListenAndServe()
    54  
    55  	shouldLivenessRun := make(chan bool)
    56  	livenessServer := chirpHTTP.NewLivenessServer(shouldLivenessRun, chirpConfig)
    57  	go livenessServer.ListenAndServe()
    58  
    59  	shouldReadinessRun := make(chan bool)
    60  	readinessServer := chirpHTTP.NewReadinessServer(shouldReadinessRun, chirpConfig)
    61  	go readinessServer.ListenAndServe()
    62  
    63  	shouldPrometheusRun := make(chan bool)
    64  	prometheusServer := dsHTTP.NewPrometheusServer(chirpConfig.PrometheusPort, shouldPrometheusRun)
    65  	go prometheusServer.ListenAndServe()
    66  
    67  	time.Sleep(3 * time.Second)
    68  
    69  	partitions := chirpConfig.Partition
    70  	senderFactory := sender.NewSenderFactory(filePersister)
    71  	messageSender, err := senderFactory.GetInstance(cfg, chirpConfig)
    72  	if err != nil {
    73  		logger.Error(err, "fail to get sender")
    74  		os.Exit(1)
    75  	}
    76  
    77  	for partition := 0; partition < partitions; partition++ {
    78  		messageProvider := provider.NewFileSystemProvider(partition, chirpConfig)
    79  		worker := worker.NewMessageWorker(partition, messageProvider, messageSender, chirpConfig)
    80  		go worker.DoWork()
    81  	}
    82  
    83  	outboxMetric := metric.NewOutboxMessagesMetric(chirpConfig)
    84  	go outboxMetric.StartRecording()
    85  
    86  	grpcServer := grpcServer.NewServer(filePersister, chirpConfig)
    87  	grpcServer.ListenAndServe()
    88  
    89  	msgRun := <-shouldMsgRun
    90  	livenessRun := <-shouldLivenessRun
    91  	readinessRun := <-shouldReadinessRun
    92  	prometheusRun := <-shouldPrometheusRun
    93  	logger.Info("should server run", "msgRun", msgRun, "livenessRun", livenessRun, "readinessRun", readinessRun, "prometheusRun", prometheusRun)
    94  }
    95  

View as plain text