package config import ( "encoding/json" "flag" "fmt" "os" "time" "github.com/peterbourgon/ff/v3" "edge-infra.dev/pkg/edge/datasync/kafkaclient" ) type TopicMapping struct { Source string `json:"source"` Target string `json:"target"` } type KafkaSSL struct { KeystoreLocation string KeystorePassword string CaLocation string EnableCertVerification string IdentificationAlgorithm string } type Config struct { KafkaConsumerTimedWindowMS time.Duration PubsubBulkSize int KafkaBulkSize int KafkaConsumerReadTimeoutMS time.Duration KafkaProducerTimeout time.Duration PubsubByteThreshold int ForemanProjectID string } func (c *Config) BindFlags(fs *flag.FlagSet) { fs.DurationVar( &c.KafkaConsumerTimedWindowMS, "kafka-consumer-timed-window-ms", 10000, "Kafka Consumer Time", ) fs.IntVar( &c.PubsubBulkSize, "pubsub-bulk-size", 100, "Bulk Size for Pubsub") fs.IntVar( &c.KafkaBulkSize, "kafka-bulk-size", 100, "Bulk Size for Kafka") fs.DurationVar( &c.KafkaConsumerReadTimeoutMS, "kafka-consumer-read-timeout-ms", 5000, "Consumer Read Timeout for Kafka in ms") fs.DurationVar( &c.KafkaProducerTimeout, "kafka-producer-timeout", 60_000, "Producer Timeout for Kafka in ms") fs.IntVar( &c.PubsubByteThreshold, "pubsub-byte-threshold", 1e6, "Pubsub Byte Threshold") fs.StringVar( &c.ForemanProjectID, "project-id", "", "Foreman Project ID") } func NewConfig() (*Config, error) { cfg := &Config{} fs := flag.NewFlagSet("shoot", flag.ExitOnError) cfg.BindFlags(fs) if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil { return cfg, err } if err := cfg.Validate(); err != nil { return cfg, err } return cfg, nil } func (c *Config) Validate() error { if c.KafkaProducerTimeout == 0 { return fmt.Errorf("missing KAFKA_CONSUMER_TIMED_WINDOW_MS environment variable") } if c.PubsubBulkSize == 0 { return fmt.Errorf("missing PUBSUB_BULK_SIZE environment variable") } if c.KafkaBulkSize == 0 { return fmt.Errorf("missing KAFKA_BULK_SIZE environment variable") } if c.KafkaConsumerReadTimeoutMS == 0 { return fmt.Errorf("missing KAFKA_CONSUMER_READ_TIMEOUT_MS environment variable") } if c.KafkaProducerTimeout == 0 { return fmt.Errorf("missing KAFKA_PRODUCER_TIMEOUT_MS environment variable") } if c.PubsubByteThreshold == 0 { return fmt.Errorf("missing PUBSUB_BYTE_THRESHOLD environment variable") } if c.ForemanProjectID == "" { return fmt.Errorf("missing PROJECT_ID environment variable") } return nil } // GetTopicMappings returns environment variable TOPICS_MAPPING func GetTopicMappings() ([]*TopicMapping, error) { mappings := os.Getenv("TOPICS_MAPPING") var res []*TopicMapping err := json.Unmarshal([]byte(mappings), &res) if err != nil { return nil, fmt.Errorf("invalid TOPICS_MAPPING environment variable") } return res, nil } // GetTargetTopic returns the Google Pub/Sub topic name which is the target topic for Kafka messages in sourceTopic func GetTargetTopic(sourceTopic string) string { mappings, _ := GetTopicMappings() for _, mapping := range mappings { if mapping.Source == sourceTopic { return mapping.Target } } return "" } // GetKafkaSettings returns environment variable KAFKA_SETTINGS func GetKafkaSettings() (KafkaSettings, error) { var settings KafkaSettings err := json.Unmarshal([]byte(os.Getenv("KAFKA_SETTINGS")), &settings) if err != nil { return settings, fmt.Errorf("invalid KAFKA_SETTINGS env variable") } return settings, nil } func GetKafkaBulkSize(cfg *Config) int { return cfg.KafkaBulkSize } func GetKafkaClientConfig(cfg *Config) (*kafkaclient.Config, error) { settings, err := GetKafkaSettings() if err != nil { return nil, err } readTimeOut := cfg.KafkaConsumerReadTimeoutMS * time.Millisecond windowTimeOut := cfg.KafkaConsumerTimedWindowMS * time.Millisecond produceTimeOut := cfg.KafkaProducerTimeout * time.Millisecond return &kafkaclient.Config{ Brokers: []string{settings.KafkaEndpoint}, MessageMaxBytes: settings.MessageMaxBytes, ProduceTimeout: produceTimeOut, GroupID: "data-sync___", BulkSize: cfg.KafkaBulkSize, ReadTimeout: readTimeOut, ReadWindowTimeout: windowTimeOut, }, nil } type KafkaSettings struct { KafkaEndpoint string `json:"kafka_endpoint"` MessageMaxBytes int32 `json:"message_max_bytes"` SecurityProtocol string `json:"security_protocol"` KeystoreLocation string `json:"keystore_location"` KeystorePassword string `json:"keystore_password"` CaLocation string `json:"ssl_ca_location"` EnableCertVerification bool `json:"ssl_enable_cert_identification"` IdentificationAlgorithm string `json:"ssl_endpoint_identification_algorithm"` } type PubSubSettings struct { ProjectID string `json:"project_id"` }