...

Source file src/edge-infra.dev/pkg/edge/datasync/shoot/config/config_provider.go

Documentation: edge-infra.dev/pkg/edge/datasync/shoot/config

     1  package config
     2  
     3  import (
     4  	"encoding/json"
     5  	"flag"
     6  	"fmt"
     7  	"os"
     8  	"time"
     9  
    10  	"github.com/peterbourgon/ff/v3"
    11  
    12  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
    13  )
    14  
    15  type TopicMapping struct {
    16  	Source string `json:"source"`
    17  	Target string `json:"target"`
    18  }
    19  
    20  type KafkaSSL struct {
    21  	KeystoreLocation        string
    22  	KeystorePassword        string
    23  	CaLocation              string
    24  	EnableCertVerification  string
    25  	IdentificationAlgorithm string
    26  }
    27  type Config struct {
    28  	KafkaConsumerTimedWindowMS time.Duration
    29  	PubsubBulkSize             int
    30  	KafkaBulkSize              int
    31  	KafkaConsumerReadTimeoutMS time.Duration
    32  	KafkaProducerTimeout       time.Duration
    33  	PubsubByteThreshold        int
    34  	ForemanProjectID           string
    35  }
    36  
    37  func (c *Config) BindFlags(fs *flag.FlagSet) {
    38  	fs.DurationVar(
    39  		&c.KafkaConsumerTimedWindowMS,
    40  		"kafka-consumer-timed-window-ms",
    41  		10000,
    42  		"Kafka Consumer Time",
    43  	)
    44  
    45  	fs.IntVar(
    46  		&c.PubsubBulkSize,
    47  		"pubsub-bulk-size",
    48  		100,
    49  		"Bulk Size for Pubsub")
    50  
    51  	fs.IntVar(
    52  		&c.KafkaBulkSize,
    53  		"kafka-bulk-size",
    54  		100,
    55  		"Bulk Size for Kafka")
    56  
    57  	fs.DurationVar(
    58  		&c.KafkaConsumerReadTimeoutMS,
    59  		"kafka-consumer-read-timeout-ms",
    60  		5000,
    61  		"Consumer Read Timeout for Kafka in ms")
    62  
    63  	fs.DurationVar(
    64  		&c.KafkaProducerTimeout,
    65  		"kafka-producer-timeout",
    66  		60_000,
    67  		"Producer Timeout for Kafka in ms")
    68  
    69  	fs.IntVar(
    70  		&c.PubsubByteThreshold,
    71  		"pubsub-byte-threshold",
    72  		1e6,
    73  		"Pubsub Byte Threshold")
    74  
    75  	fs.StringVar(
    76  		&c.ForemanProjectID,
    77  		"project-id",
    78  		"",
    79  		"Foreman Project ID")
    80  }
    81  
    82  func NewConfig() (*Config, error) {
    83  	cfg := &Config{}
    84  
    85  	fs := flag.NewFlagSet("shoot", flag.ExitOnError)
    86  
    87  	cfg.BindFlags(fs)
    88  
    89  	if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil {
    90  		return cfg, err
    91  	}
    92  
    93  	if err := cfg.Validate(); err != nil {
    94  		return cfg, err
    95  	}
    96  	return cfg, nil
    97  }
    98  
    99  func (c *Config) Validate() error {
   100  	if c.KafkaProducerTimeout == 0 {
   101  		return fmt.Errorf("missing KAFKA_CONSUMER_TIMED_WINDOW_MS environment variable")
   102  	}
   103  
   104  	if c.PubsubBulkSize == 0 {
   105  		return fmt.Errorf("missing PUBSUB_BULK_SIZE environment variable")
   106  	}
   107  
   108  	if c.KafkaBulkSize == 0 {
   109  		return fmt.Errorf("missing KAFKA_BULK_SIZE environment variable")
   110  	}
   111  
   112  	if c.KafkaConsumerReadTimeoutMS == 0 {
   113  		return fmt.Errorf("missing KAFKA_CONSUMER_READ_TIMEOUT_MS environment variable")
   114  	}
   115  
   116  	if c.KafkaProducerTimeout == 0 {
   117  		return fmt.Errorf("missing KAFKA_PRODUCER_TIMEOUT_MS environment variable")
   118  	}
   119  
   120  	if c.PubsubByteThreshold == 0 {
   121  		return fmt.Errorf("missing PUBSUB_BYTE_THRESHOLD environment variable")
   122  	}
   123  
   124  	if c.ForemanProjectID == "" {
   125  		return fmt.Errorf("missing PROJECT_ID environment variable")
   126  	}
   127  
   128  	return nil
   129  }
   130  
   131  // GetTopicMappings returns environment variable TOPICS_MAPPING
   132  func GetTopicMappings() ([]*TopicMapping, error) {
   133  	mappings := os.Getenv("TOPICS_MAPPING")
   134  
   135  	var res []*TopicMapping
   136  
   137  	err := json.Unmarshal([]byte(mappings), &res)
   138  	if err != nil {
   139  		return nil, fmt.Errorf("invalid TOPICS_MAPPING environment variable")
   140  	}
   141  
   142  	return res, nil
   143  }
   144  
   145  // GetTargetTopic returns the Google Pub/Sub topic name which is the target topic for Kafka messages in sourceTopic
   146  func GetTargetTopic(sourceTopic string) string {
   147  	mappings, _ := GetTopicMappings()
   148  
   149  	for _, mapping := range mappings {
   150  		if mapping.Source == sourceTopic {
   151  			return mapping.Target
   152  		}
   153  	}
   154  
   155  	return ""
   156  }
   157  
   158  // GetKafkaSettings returns environment variable KAFKA_SETTINGS
   159  func GetKafkaSettings() (KafkaSettings, error) {
   160  	var settings KafkaSettings
   161  
   162  	err := json.Unmarshal([]byte(os.Getenv("KAFKA_SETTINGS")), &settings)
   163  	if err != nil {
   164  		return settings, fmt.Errorf("invalid KAFKA_SETTINGS env variable")
   165  	}
   166  
   167  	return settings, nil
   168  }
   169  
   170  func GetKafkaBulkSize(cfg *Config) int {
   171  	return cfg.KafkaBulkSize
   172  }
   173  
   174  func GetKafkaClientConfig(cfg *Config) (*kafkaclient.Config, error) {
   175  	settings, err := GetKafkaSettings()
   176  	if err != nil {
   177  		return nil, err
   178  	}
   179  	readTimeOut := cfg.KafkaConsumerReadTimeoutMS * time.Millisecond
   180  	windowTimeOut := cfg.KafkaConsumerTimedWindowMS * time.Millisecond
   181  	produceTimeOut := cfg.KafkaProducerTimeout * time.Millisecond
   182  
   183  	return &kafkaclient.Config{
   184  		Brokers:           []string{settings.KafkaEndpoint},
   185  		MessageMaxBytes:   settings.MessageMaxBytes,
   186  		ProduceTimeout:    produceTimeOut,
   187  		GroupID:           "data-sync___",
   188  		BulkSize:          cfg.KafkaBulkSize,
   189  		ReadTimeout:       readTimeOut,
   190  		ReadWindowTimeout: windowTimeOut,
   191  	}, nil
   192  }
   193  
   194  type KafkaSettings struct {
   195  	KafkaEndpoint           string `json:"kafka_endpoint"`
   196  	MessageMaxBytes         int32  `json:"message_max_bytes"`
   197  	SecurityProtocol        string `json:"security_protocol"`
   198  	KeystoreLocation        string `json:"keystore_location"`
   199  	KeystorePassword        string `json:"keystore_password"`
   200  	CaLocation              string `json:"ssl_ca_location"`
   201  	EnableCertVerification  bool   `json:"ssl_enable_cert_identification"`
   202  	IdentificationAlgorithm string `json:"ssl_endpoint_identification_algorithm"`
   203  }
   204  
   205  type PubSubSettings struct {
   206  	ProjectID string `json:"project_id"`
   207  }
   208  

View as plain text