1 package config
2
3 import (
4 "encoding/json"
5 "flag"
6 "fmt"
7 "os"
8 "time"
9
10 "github.com/peterbourgon/ff/v3"
11
12 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
13 )
14
15 type TopicMapping struct {
16 Source string `json:"source"`
17 Target string `json:"target"`
18 }
19
20 type KafkaSSL struct {
21 KeystoreLocation string
22 KeystorePassword string
23 CaLocation string
24 EnableCertVerification string
25 IdentificationAlgorithm string
26 }
27 type Config struct {
28 KafkaConsumerTimedWindowMS time.Duration
29 PubsubBulkSize int
30 KafkaBulkSize int
31 KafkaConsumerReadTimeoutMS time.Duration
32 KafkaProducerTimeout time.Duration
33 PubsubByteThreshold int
34 ForemanProjectID string
35 }
36
37 func (c *Config) BindFlags(fs *flag.FlagSet) {
38 fs.DurationVar(
39 &c.KafkaConsumerTimedWindowMS,
40 "kafka-consumer-timed-window-ms",
41 10000,
42 "Kafka Consumer Time",
43 )
44
45 fs.IntVar(
46 &c.PubsubBulkSize,
47 "pubsub-bulk-size",
48 100,
49 "Bulk Size for Pubsub")
50
51 fs.IntVar(
52 &c.KafkaBulkSize,
53 "kafka-bulk-size",
54 100,
55 "Bulk Size for Kafka")
56
57 fs.DurationVar(
58 &c.KafkaConsumerReadTimeoutMS,
59 "kafka-consumer-read-timeout-ms",
60 5000,
61 "Consumer Read Timeout for Kafka in ms")
62
63 fs.DurationVar(
64 &c.KafkaProducerTimeout,
65 "kafka-producer-timeout",
66 60_000,
67 "Producer Timeout for Kafka in ms")
68
69 fs.IntVar(
70 &c.PubsubByteThreshold,
71 "pubsub-byte-threshold",
72 1e6,
73 "Pubsub Byte Threshold")
74
75 fs.StringVar(
76 &c.ForemanProjectID,
77 "project-id",
78 "",
79 "Foreman Project ID")
80 }
81
82 func NewConfig() (*Config, error) {
83 cfg := &Config{}
84
85 fs := flag.NewFlagSet("shoot", flag.ExitOnError)
86
87 cfg.BindFlags(fs)
88
89 if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil {
90 return cfg, err
91 }
92
93 if err := cfg.Validate(); err != nil {
94 return cfg, err
95 }
96 return cfg, nil
97 }
98
99 func (c *Config) Validate() error {
100 if c.KafkaProducerTimeout == 0 {
101 return fmt.Errorf("missing KAFKA_CONSUMER_TIMED_WINDOW_MS environment variable")
102 }
103
104 if c.PubsubBulkSize == 0 {
105 return fmt.Errorf("missing PUBSUB_BULK_SIZE environment variable")
106 }
107
108 if c.KafkaBulkSize == 0 {
109 return fmt.Errorf("missing KAFKA_BULK_SIZE environment variable")
110 }
111
112 if c.KafkaConsumerReadTimeoutMS == 0 {
113 return fmt.Errorf("missing KAFKA_CONSUMER_READ_TIMEOUT_MS environment variable")
114 }
115
116 if c.KafkaProducerTimeout == 0 {
117 return fmt.Errorf("missing KAFKA_PRODUCER_TIMEOUT_MS environment variable")
118 }
119
120 if c.PubsubByteThreshold == 0 {
121 return fmt.Errorf("missing PUBSUB_BYTE_THRESHOLD environment variable")
122 }
123
124 if c.ForemanProjectID == "" {
125 return fmt.Errorf("missing PROJECT_ID environment variable")
126 }
127
128 return nil
129 }
130
131
132 func GetTopicMappings() ([]*TopicMapping, error) {
133 mappings := os.Getenv("TOPICS_MAPPING")
134
135 var res []*TopicMapping
136
137 err := json.Unmarshal([]byte(mappings), &res)
138 if err != nil {
139 return nil, fmt.Errorf("invalid TOPICS_MAPPING environment variable")
140 }
141
142 return res, nil
143 }
144
145
146 func GetTargetTopic(sourceTopic string) string {
147 mappings, _ := GetTopicMappings()
148
149 for _, mapping := range mappings {
150 if mapping.Source == sourceTopic {
151 return mapping.Target
152 }
153 }
154
155 return ""
156 }
157
158
159 func GetKafkaSettings() (KafkaSettings, error) {
160 var settings KafkaSettings
161
162 err := json.Unmarshal([]byte(os.Getenv("KAFKA_SETTINGS")), &settings)
163 if err != nil {
164 return settings, fmt.Errorf("invalid KAFKA_SETTINGS env variable")
165 }
166
167 return settings, nil
168 }
169
170 func GetKafkaBulkSize(cfg *Config) int {
171 return cfg.KafkaBulkSize
172 }
173
174 func GetKafkaClientConfig(cfg *Config) (*kafkaclient.Config, error) {
175 settings, err := GetKafkaSettings()
176 if err != nil {
177 return nil, err
178 }
179 readTimeOut := cfg.KafkaConsumerReadTimeoutMS * time.Millisecond
180 windowTimeOut := cfg.KafkaConsumerTimedWindowMS * time.Millisecond
181 produceTimeOut := cfg.KafkaProducerTimeout * time.Millisecond
182
183 return &kafkaclient.Config{
184 Brokers: []string{settings.KafkaEndpoint},
185 MessageMaxBytes: settings.MessageMaxBytes,
186 ProduceTimeout: produceTimeOut,
187 GroupID: "data-sync___",
188 BulkSize: cfg.KafkaBulkSize,
189 ReadTimeout: readTimeOut,
190 ReadWindowTimeout: windowTimeOut,
191 }, nil
192 }
193
194 type KafkaSettings struct {
195 KafkaEndpoint string `json:"kafka_endpoint"`
196 MessageMaxBytes int32 `json:"message_max_bytes"`
197 SecurityProtocol string `json:"security_protocol"`
198 KeystoreLocation string `json:"keystore_location"`
199 KeystorePassword string `json:"keystore_password"`
200 CaLocation string `json:"ssl_ca_location"`
201 EnableCertVerification bool `json:"ssl_enable_cert_identification"`
202 IdentificationAlgorithm string `json:"ssl_endpoint_identification_algorithm"`
203 }
204
205 type PubSubSettings struct {
206 ProjectID string `json:"project_id"`
207 }
208
View as plain text