package config import ( "encoding/json" "errors" "flag" "fmt" "os" "strings" "time" "github.com/peterbourgon/ff/v3" "edge-infra.dev/pkg/edge/datasync/kafkaclient" ) // strings values for error messages and consts const ( outboxPath = "OUTBOX_PATH" messageRouting = "MESSAGE_ROUTING" sharedKey = "DATA_SYNC_SHARED_KEY" secretKey = "DATA_SYNC_SECRET_KEY" appKey = "DATA_SYNC_APP_KEY" targetSettings = "TARGET_SETTINGS" organizationID = "ORGANIZATION_ID" organizationName = "ORGANIZATION_NAME" siteID = "SITE_ID" siteName = "SITE_NAME" target = "TARGET" ) // Config TODO: https://github.com/ncr-swt-retail/edge-roadmap/issues/10011 type Config struct { GrpcPort string MsgPort string LivenessPort string ReadinessPort string PrometheusPort string RunMessagingService bool KafkaProducerTimeout int WorkerPolling int RunMessageWorker bool MessageRouting string Partition int OutboxPath string } type DSConfig struct { OrganizationID string OrganizationName string SiteID string SiteName string } var cfg DSConfig func (c *Config) BindFlags(fs *flag.FlagSet) { fs.StringVar( &c.GrpcPort, "grpc-port", "", "GRPC Port Value") fs.StringVar( &c.MsgPort, "msg-service-port", "", "Message Service Port") fs.StringVar( &c.LivenessPort, "liveness-port", "", "Liveness Port") fs.StringVar( &c.ReadinessPort, "readiness-port", "", "Readiness Port") fs.StringVar( &c.PrometheusPort, "prometheus-port", "", "Prometheus Port") fs.BoolVar( &c.RunMessagingService, "run-messaging-service", true, "Flag for running messaging service") fs.IntVar( &c.KafkaProducerTimeout, "kafka-producer-timeout-ms", 60000, "Value for the Kafka Producer Timeout in ms") fs.IntVar( &c.WorkerPolling, "worker-polling", 5, "Value for the Worker Polling") fs.BoolVar( &c.RunMessageWorker, "run-message-worker", true, "Flag for running message worker") fs.IntVar( &c.Partition, "partition", 5, "Partition Value for Chirp Config") fs.StringVar( &c.MessageRouting, "message-routing", "", "Value for the Message Routing") fs.StringVar( &c.OutboxPath, "outbox-path", "", "Value for the Outbox Path") } type MessageRoute struct { Type string Route string Topic string BulkSize int Compression bool Signing bool } type Message struct { Type string `json:"type"` Compression bool `json:"compression"` Signing bool `json:"signing"` Route string `json:"route"` } type MessageRouteConfig struct { Routes []Route `json:"routes"` Messages []Message `json:"messages"` } type Route struct { ID string `json:"id"` Topic string `json:"topic"` BulkSize int `json:"bulk_size"` } type KafkaSSL struct { KeystoreLocation string KeystorePassword string CaLocation string EnableCertVerification string IdentificationAlgorithm string } func (c *Config) Validate() error { if c.GrpcPort == "" { return fmt.Errorf("gRPC port cannot be empty") } if c.MsgPort == "" { return fmt.Errorf("message service port cannot be empty") } if c.LivenessPort == "" { return fmt.Errorf("liveness probe cannot be empty") } if c.ReadinessPort == "" { return fmt.Errorf("readiness probe cannot be empty") } if c.PrometheusPort == "" { return fmt.Errorf("prometheus port cannot be empty") } if c.KafkaProducerTimeout == 0 { return fmt.Errorf("kafka producer timeout cannot be empty") } if c.WorkerPolling == 0 { return fmt.Errorf("worker polling cannot be empty") } if c.Partition == 0 { return fmt.Errorf("partition cannot be empty") } if c.MessageRouting == "" { return fmt.Errorf("message routing cannot be empty") } if c.OutboxPath == "" { return fmt.Errorf("outbox-path cannot be empty") } return nil } func NewConfig() (*Config, error) { cfg := &Config{} fs := flag.NewFlagSet("internal", flag.ExitOnError) cfg.BindFlags(fs) if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil { return cfg, err } if err := cfg.Validate(); err != nil { return cfg, err } return cfg, nil } func ValidateConfiguration() error { result := make([]string, 0) cfg.OrganizationID = os.Getenv(organizationID) cfg.OrganizationName = os.Getenv(organizationName) cfg.SiteID = os.Getenv(siteID) cfg.SiteName = os.Getenv(siteName) if cfg.OrganizationID == "" || cfg.OrganizationName == "" || cfg.SiteID == "" || cfg.SiteName == "" { result = append(result, organizationID, organizationName, siteID, siteName) } if GetTarget() == "kafka" { _, err := GetKafkaSettings() if err != nil { result = append(result, targetSettings) } } if GetTarget() == "pubsub" { _, err := GetPubSubSettings() if err != nil { result = append(result, targetSettings) } } if len(result) > 0 { return fmt.Errorf("missing the following environment variables: %v", strings.Join(result, ", ")) } return nil } func GetRouteConfig(messageType string, chirpConfig *Config) *MessageRoute { messageRouteMap := getMessageRoutePerMessageType(chirpConfig) elem, ok := messageRouteMap[messageType] if !ok { r := messageRouteMap["*"] r.Type = messageType return r } return elem } func GetRoutes(chirpConfig *Config) []string { var result []string configs, err := getRouteConfigs(chirpConfig) if err != nil { return result } for i := 0; i < len(configs.Routes); i++ { result = append(result, configs.Routes[i].ID) } return result } func GetBulkSizes(chirpConfig *Config) map[string]int { bulkSizes := make(map[string]int) configs, err := getRouteConfigs(chirpConfig) if err != nil { return bulkSizes } for i := 0; i < len(configs.Routes); i++ { bulkSizes[configs.Routes[i].ID] = configs.Routes[i].BulkSize } return bulkSizes } func GetTopics(chirpConfig *Config) map[string]string { topics := make(map[string]string) configs, err := getRouteConfigs(chirpConfig) if err != nil { return topics } for i := 0; i < len(configs.Routes); i++ { topics[configs.Routes[i].ID] = configs.Routes[i].Topic } return topics } func GetOutboxPath(chirpConfig *Config) string { o := chirpConfig.OutboxPath // calling code will assume no trailing slash in this path if l := len(o) - 1; l >= 0 && o[l] == '/' { o = o[:l] } return o } func ShouldCompressMessage(messageType string, chirpConfig *Config) bool { cfg := GetRouteConfig(messageType, chirpConfig) return cfg.Compression } func ShouldSignMessage(messageType string, chirpConfig *Config) bool { cfg := GetRouteConfig(messageType, chirpConfig) return cfg.Signing } func GetSharedKey() string { return os.Getenv(sharedKey) } func GetSecretKey() string { return os.Getenv(secretKey) } func GetAppKey() string { return os.Getenv(appKey) } func GetTarget() string { t := os.Getenv(target) if t == "" { return "kafka" } return t } func GetKafkaSettings() (KafkaSettings, error) { var settings KafkaSettings err := json.Unmarshal([]byte(os.Getenv("TARGET_SETTINGS")), &settings) if err != nil { return settings, fmt.Errorf("invalid Kafka TARGET_SETTINGS env variable") } return settings, nil } func GetPubSubSettings() (PubSubSettings, error) { var settings PubSubSettings err := json.Unmarshal([]byte(os.Getenv("TARGET_SETTINGS")), &settings) if err != nil { return settings, fmt.Errorf("invalid PubSubSettings TARGET_SETTINGS env variable") } return settings, nil } type KafkaSettings struct { KafkaEndpoint string `json:"kafka_endpoint"` MessageMaxBytes int32 `json:"message_max_bytes"` SecurityProtocol string `json:"security_protocol"` KeystoreLocation string `json:"keystore_location"` KeystorePassword string `json:"keystore_password"` CaLocation string `json:"ssl_ca_location"` EnableCertVerification bool `json:"ssl_enable_cert_identification"` IdentificationAlgorithm string `json:"ssl_endpoint_identification_algorithm"` } type PubSubSettings struct { ProjectID string `json:"project_id"` } func getRoutesMap(chirpConfig *Config) map[string]Route { result := make(map[string]Route) routeConfigs, _ := getRouteConfigs(chirpConfig) for _, route := range routeConfigs.Routes { result[route.ID] = route } return result } func getMessageRoutePerMessageType(chirpConfig *Config) map[string]*MessageRoute { result := make(map[string]*MessageRoute) routeConfigs, _ := getRouteConfigs(chirpConfig) routesMap := getRoutesMap(chirpConfig) var messageRoute *MessageRoute for _, message := range routeConfigs.Messages { route := routesMap[message.Route] messageRoute = &MessageRoute{ Type: message.Type, Route: message.Route, Compression: message.Compression, Signing: message.Signing, Topic: route.Topic, BulkSize: route.BulkSize, } result[message.Type] = messageRoute } return result } func getRouteConfigs(chirpConfig *Config) (MessageRouteConfig, error) { var result MessageRouteConfig err := json.Unmarshal([]byte(chirpConfig.MessageRouting), &result) if err != nil { return result, errors.New("invalid MESSAGE_ROUTING environment variable") } return result, nil } func OrganizationName() string { return cfg.OrganizationName } func OrganizationID() string { return cfg.OrganizationID } func SiteID() string { return cfg.SiteID } func SiteName() string { return cfg.SiteName } func GetKafkaClientConfig(chirpConfig *Config) (*kafkaclient.Config, error) { settings, err := GetKafkaSettings() if err != nil { return nil, err } produceTimeOut := time.Duration(chirpConfig.KafkaProducerTimeout) * time.Millisecond return &kafkaclient.Config{ Brokers: []string{settings.KafkaEndpoint}, MessageMaxBytes: settings.MessageMaxBytes, ProduceTimeout: produceTimeOut, GroupID: "data-sync___", }, nil }