1 package config
2
3 import (
4 "encoding/json"
5 "errors"
6 "flag"
7 "fmt"
8 "os"
9 "strings"
10 "time"
11
12 "github.com/peterbourgon/ff/v3"
13
14 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
15 )
16
17
18 const (
19 outboxPath = "OUTBOX_PATH"
20 messageRouting = "MESSAGE_ROUTING"
21 sharedKey = "DATA_SYNC_SHARED_KEY"
22 secretKey = "DATA_SYNC_SECRET_KEY"
23 appKey = "DATA_SYNC_APP_KEY"
24 targetSettings = "TARGET_SETTINGS"
25 organizationID = "ORGANIZATION_ID"
26 organizationName = "ORGANIZATION_NAME"
27 siteID = "SITE_ID"
28 siteName = "SITE_NAME"
29 target = "TARGET"
30 )
31
32
33 type Config struct {
34 GrpcPort string
35 MsgPort string
36 LivenessPort string
37 ReadinessPort string
38 PrometheusPort string
39 RunMessagingService bool
40 KafkaProducerTimeout int
41 WorkerPolling int
42 RunMessageWorker bool
43 MessageRouting string
44 Partition int
45 OutboxPath string
46 }
47
48 type DSConfig struct {
49 OrganizationID string
50 OrganizationName string
51 SiteID string
52 SiteName string
53 }
54
55 var cfg DSConfig
56
57 func (c *Config) BindFlags(fs *flag.FlagSet) {
58 fs.StringVar(
59 &c.GrpcPort,
60 "grpc-port",
61 "",
62 "GRPC Port Value")
63
64 fs.StringVar(
65 &c.MsgPort,
66 "msg-service-port",
67 "",
68 "Message Service Port")
69
70 fs.StringVar(
71 &c.LivenessPort,
72 "liveness-port",
73 "",
74 "Liveness Port")
75
76 fs.StringVar(
77 &c.ReadinessPort,
78 "readiness-port",
79 "",
80 "Readiness Port")
81
82 fs.StringVar(
83 &c.PrometheusPort,
84 "prometheus-port",
85 "",
86 "Prometheus Port")
87
88 fs.BoolVar(
89 &c.RunMessagingService,
90 "run-messaging-service",
91 true,
92 "Flag for running messaging service")
93
94 fs.IntVar(
95 &c.KafkaProducerTimeout,
96 "kafka-producer-timeout-ms",
97 60000,
98 "Value for the Kafka Producer Timeout in ms")
99
100 fs.IntVar(
101 &c.WorkerPolling,
102 "worker-polling",
103 5,
104 "Value for the Worker Polling")
105
106 fs.BoolVar(
107 &c.RunMessageWorker,
108 "run-message-worker",
109 true,
110 "Flag for running message worker")
111
112 fs.IntVar(
113 &c.Partition,
114 "partition",
115 5,
116 "Partition Value for Chirp Config")
117
118 fs.StringVar(
119 &c.MessageRouting,
120 "message-routing",
121 "",
122 "Value for the Message Routing")
123
124 fs.StringVar(
125 &c.OutboxPath,
126 "outbox-path",
127 "",
128 "Value for the Outbox Path")
129 }
130
131 type MessageRoute struct {
132 Type string
133 Route string
134 Topic string
135 BulkSize int
136 Compression bool
137 Signing bool
138 }
139
140 type Message struct {
141 Type string `json:"type"`
142 Compression bool `json:"compression"`
143 Signing bool `json:"signing"`
144 Route string `json:"route"`
145 }
146 type MessageRouteConfig struct {
147 Routes []Route `json:"routes"`
148 Messages []Message `json:"messages"`
149 }
150 type Route struct {
151 ID string `json:"id"`
152 Topic string `json:"topic"`
153 BulkSize int `json:"bulk_size"`
154 }
155 type KafkaSSL struct {
156 KeystoreLocation string
157 KeystorePassword string
158 CaLocation string
159 EnableCertVerification string
160 IdentificationAlgorithm string
161 }
162
163 func (c *Config) Validate() error {
164 if c.GrpcPort == "" {
165 return fmt.Errorf("gRPC port cannot be empty")
166 }
167
168 if c.MsgPort == "" {
169 return fmt.Errorf("message service port cannot be empty")
170 }
171
172 if c.LivenessPort == "" {
173 return fmt.Errorf("liveness probe cannot be empty")
174 }
175
176 if c.ReadinessPort == "" {
177 return fmt.Errorf("readiness probe cannot be empty")
178 }
179
180 if c.PrometheusPort == "" {
181 return fmt.Errorf("prometheus port cannot be empty")
182 }
183
184 if c.KafkaProducerTimeout == 0 {
185 return fmt.Errorf("kafka producer timeout cannot be empty")
186 }
187
188 if c.WorkerPolling == 0 {
189 return fmt.Errorf("worker polling cannot be empty")
190 }
191 if c.Partition == 0 {
192 return fmt.Errorf("partition cannot be empty")
193 }
194
195 if c.MessageRouting == "" {
196 return fmt.Errorf("message routing cannot be empty")
197 }
198
199 if c.OutboxPath == "" {
200 return fmt.Errorf("outbox-path cannot be empty")
201 }
202 return nil
203 }
204
205 func NewConfig() (*Config, error) {
206 cfg := &Config{}
207
208 fs := flag.NewFlagSet("internal", flag.ExitOnError)
209
210 cfg.BindFlags(fs)
211
212 if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil {
213 return cfg, err
214 }
215
216 if err := cfg.Validate(); err != nil {
217 return cfg, err
218 }
219
220 return cfg, nil
221 }
222
223 func ValidateConfiguration() error {
224 result := make([]string, 0)
225
226 cfg.OrganizationID = os.Getenv(organizationID)
227 cfg.OrganizationName = os.Getenv(organizationName)
228 cfg.SiteID = os.Getenv(siteID)
229 cfg.SiteName = os.Getenv(siteName)
230
231 if cfg.OrganizationID == "" || cfg.OrganizationName == "" || cfg.SiteID == "" || cfg.SiteName == "" {
232 result = append(result, organizationID, organizationName, siteID, siteName)
233 }
234
235 if GetTarget() == "kafka" {
236 _, err := GetKafkaSettings()
237 if err != nil {
238 result = append(result, targetSettings)
239 }
240 }
241
242 if GetTarget() == "pubsub" {
243 _, err := GetPubSubSettings()
244 if err != nil {
245 result = append(result, targetSettings)
246 }
247 }
248
249 if len(result) > 0 {
250 return fmt.Errorf("missing the following environment variables: %v", strings.Join(result, ", "))
251 }
252
253 return nil
254 }
255
256 func GetRouteConfig(messageType string, chirpConfig *Config) *MessageRoute {
257 messageRouteMap := getMessageRoutePerMessageType(chirpConfig)
258
259 elem, ok := messageRouteMap[messageType]
260 if !ok {
261 r := messageRouteMap["*"]
262 r.Type = messageType
263 return r
264 }
265
266 return elem
267 }
268
269 func GetRoutes(chirpConfig *Config) []string {
270 var result []string
271
272 configs, err := getRouteConfigs(chirpConfig)
273 if err != nil {
274 return result
275 }
276
277 for i := 0; i < len(configs.Routes); i++ {
278 result = append(result, configs.Routes[i].ID)
279 }
280
281 return result
282 }
283
284 func GetBulkSizes(chirpConfig *Config) map[string]int {
285 bulkSizes := make(map[string]int)
286
287 configs, err := getRouteConfigs(chirpConfig)
288 if err != nil {
289 return bulkSizes
290 }
291
292 for i := 0; i < len(configs.Routes); i++ {
293 bulkSizes[configs.Routes[i].ID] = configs.Routes[i].BulkSize
294 }
295
296 return bulkSizes
297 }
298
299 func GetTopics(chirpConfig *Config) map[string]string {
300 topics := make(map[string]string)
301
302 configs, err := getRouteConfigs(chirpConfig)
303 if err != nil {
304 return topics
305 }
306
307 for i := 0; i < len(configs.Routes); i++ {
308 topics[configs.Routes[i].ID] = configs.Routes[i].Topic
309 }
310
311 return topics
312 }
313
314 func GetOutboxPath(chirpConfig *Config) string {
315 o := chirpConfig.OutboxPath
316
317
318 if l := len(o) - 1; l >= 0 && o[l] == '/' {
319 o = o[:l]
320 }
321
322 return o
323 }
324
325 func ShouldCompressMessage(messageType string, chirpConfig *Config) bool {
326 cfg := GetRouteConfig(messageType, chirpConfig)
327 return cfg.Compression
328 }
329
330 func ShouldSignMessage(messageType string, chirpConfig *Config) bool {
331 cfg := GetRouteConfig(messageType, chirpConfig)
332 return cfg.Signing
333 }
334
335 func GetSharedKey() string {
336 return os.Getenv(sharedKey)
337 }
338
339 func GetSecretKey() string {
340 return os.Getenv(secretKey)
341 }
342
343 func GetAppKey() string {
344 return os.Getenv(appKey)
345 }
346
347 func GetTarget() string {
348 t := os.Getenv(target)
349 if t == "" {
350 return "kafka"
351 }
352
353 return t
354 }
355
356 func GetKafkaSettings() (KafkaSettings, error) {
357 var settings KafkaSettings
358
359 err := json.Unmarshal([]byte(os.Getenv("TARGET_SETTINGS")), &settings)
360 if err != nil {
361 return settings, fmt.Errorf("invalid Kafka TARGET_SETTINGS env variable")
362 }
363
364 return settings, nil
365 }
366
367 func GetPubSubSettings() (PubSubSettings, error) {
368 var settings PubSubSettings
369
370 err := json.Unmarshal([]byte(os.Getenv("TARGET_SETTINGS")), &settings)
371 if err != nil {
372 return settings, fmt.Errorf("invalid PubSubSettings TARGET_SETTINGS env variable")
373 }
374
375 return settings, nil
376 }
377
378 type KafkaSettings struct {
379 KafkaEndpoint string `json:"kafka_endpoint"`
380 MessageMaxBytes int32 `json:"message_max_bytes"`
381 SecurityProtocol string `json:"security_protocol"`
382 KeystoreLocation string `json:"keystore_location"`
383 KeystorePassword string `json:"keystore_password"`
384 CaLocation string `json:"ssl_ca_location"`
385 EnableCertVerification bool `json:"ssl_enable_cert_identification"`
386 IdentificationAlgorithm string `json:"ssl_endpoint_identification_algorithm"`
387 }
388
389 type PubSubSettings struct {
390 ProjectID string `json:"project_id"`
391 }
392
393 func getRoutesMap(chirpConfig *Config) map[string]Route {
394 result := make(map[string]Route)
395
396 routeConfigs, _ := getRouteConfigs(chirpConfig)
397
398 for _, route := range routeConfigs.Routes {
399 result[route.ID] = route
400 }
401
402 return result
403 }
404
405 func getMessageRoutePerMessageType(chirpConfig *Config) map[string]*MessageRoute {
406 result := make(map[string]*MessageRoute)
407
408 routeConfigs, _ := getRouteConfigs(chirpConfig)
409 routesMap := getRoutesMap(chirpConfig)
410
411 var messageRoute *MessageRoute
412 for _, message := range routeConfigs.Messages {
413 route := routesMap[message.Route]
414
415 messageRoute = &MessageRoute{
416 Type: message.Type,
417 Route: message.Route,
418 Compression: message.Compression,
419 Signing: message.Signing,
420 Topic: route.Topic,
421 BulkSize: route.BulkSize,
422 }
423
424 result[message.Type] = messageRoute
425 }
426
427 return result
428 }
429
430 func getRouteConfigs(chirpConfig *Config) (MessageRouteConfig, error) {
431 var result MessageRouteConfig
432 err := json.Unmarshal([]byte(chirpConfig.MessageRouting), &result)
433 if err != nil {
434 return result, errors.New("invalid MESSAGE_ROUTING environment variable")
435 }
436 return result, nil
437 }
438
439 func OrganizationName() string {
440 return cfg.OrganizationName
441 }
442
443 func OrganizationID() string {
444 return cfg.OrganizationID
445 }
446
447 func SiteID() string {
448 return cfg.SiteID
449 }
450
451 func SiteName() string {
452 return cfg.SiteName
453 }
454
455 func GetKafkaClientConfig(chirpConfig *Config) (*kafkaclient.Config, error) {
456 settings, err := GetKafkaSettings()
457 if err != nil {
458 return nil, err
459 }
460 produceTimeOut := time.Duration(chirpConfig.KafkaProducerTimeout) * time.Millisecond
461 return &kafkaclient.Config{
462 Brokers: []string{settings.KafkaEndpoint},
463 MessageMaxBytes: settings.MessageMaxBytes,
464 ProduceTimeout: produceTimeOut,
465
466 GroupID: "data-sync___",
467 }, nil
468 }
469
View as plain text