...
1 package chirp
2
3 import (
4 "flag"
5 "os"
6 "runtime"
7 "time"
8
9 configLoader "github.com/joho/godotenv"
10
11 "edge-infra.dev/pkg/edge/datasync/chirp/provider"
12 "edge-infra.dev/pkg/edge/datasync/chirp/sender"
13 grpcServer "edge-infra.dev/pkg/edge/datasync/chirp/server/grpc"
14 chirpHTTP "edge-infra.dev/pkg/edge/datasync/chirp/server/http"
15 "edge-infra.dev/pkg/edge/datasync/chirp/worker"
16 dsHTTP "edge-infra.dev/pkg/edge/datasync/http"
17 "edge-infra.dev/pkg/edge/datasync/internal/config"
18 "edge-infra.dev/pkg/edge/datasync/internal/metric"
19 "edge-infra.dev/pkg/lib/fog"
20 )
21
22 func Main() {
23 runtime.GOMAXPROCS(4)
24 logger := fog.New().WithName("data-sync-messaging")
25
26 if _, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST"); !exists {
27 if err := configLoader.Load(); err != nil {
28 logger.Error(err, "failed to load configuration")
29 os.Exit(1)
30 }
31 }
32
33 flag.Parse()
34 var err = config.ValidateConfiguration()
35 if err != nil {
36 logger.Error(err, "failed to start application. missing environment configuration")
37 os.Exit(1)
38 }
39 chirpConfig, err := config.NewConfig()
40 if err != nil {
41 logger.Error(err, "invalid config for Chirp")
42 os.Exit(1)
43 }
44
45 cfg, err := config.GetKafkaClientConfig(chirpConfig)
46 if err != nil {
47 logger.Error(err, "fail to get valid kafka client config")
48 os.Exit(1)
49 }
50
51 shouldMsgRun := make(chan bool)
52 httpMsgServer, filePersister := chirpHTTP.NewMsgServer(shouldMsgRun, chirpConfig)
53 go httpMsgServer.ListenAndServe()
54
55 shouldLivenessRun := make(chan bool)
56 livenessServer := chirpHTTP.NewLivenessServer(shouldLivenessRun, chirpConfig)
57 go livenessServer.ListenAndServe()
58
59 shouldReadinessRun := make(chan bool)
60 readinessServer := chirpHTTP.NewReadinessServer(shouldReadinessRun, chirpConfig)
61 go readinessServer.ListenAndServe()
62
63 shouldPrometheusRun := make(chan bool)
64 prometheusServer := dsHTTP.NewPrometheusServer(chirpConfig.PrometheusPort, shouldPrometheusRun)
65 go prometheusServer.ListenAndServe()
66
67 time.Sleep(3 * time.Second)
68
69 partitions := chirpConfig.Partition
70 senderFactory := sender.NewSenderFactory(filePersister)
71 messageSender, err := senderFactory.GetInstance(cfg, chirpConfig)
72 if err != nil {
73 logger.Error(err, "fail to get sender")
74 os.Exit(1)
75 }
76
77 for partition := 0; partition < partitions; partition++ {
78 messageProvider := provider.NewFileSystemProvider(partition, chirpConfig)
79 worker := worker.NewMessageWorker(partition, messageProvider, messageSender, chirpConfig)
80 go worker.DoWork()
81 }
82
83 outboxMetric := metric.NewOutboxMessagesMetric(chirpConfig)
84 go outboxMetric.StartRecording()
85
86 grpcServer := grpcServer.NewServer(filePersister, chirpConfig)
87 grpcServer.ListenAndServe()
88
89 msgRun := <-shouldMsgRun
90 livenessRun := <-shouldLivenessRun
91 readinessRun := <-shouldReadinessRun
92 prometheusRun := <-shouldPrometheusRun
93 logger.Info("should server run", "msgRun", msgRun, "livenessRun", livenessRun, "readinessRun", readinessRun, "prometheusRun", prometheusRun)
94 }
95
View as plain text