...

Source file src/edge-infra.dev/pkg/edge/datasync/internal/config/ConfigProvider.go

Documentation: edge-infra.dev/pkg/edge/datasync/internal/config

     1  package config
     2  
     3  import (
     4  	"encoding/json"
     5  	"errors"
     6  	"flag"
     7  	"fmt"
     8  	"os"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/peterbourgon/ff/v3"
    13  
    14  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
    15  )
    16  
    17  // strings values for error messages and consts
    18  const (
    19  	outboxPath       = "OUTBOX_PATH"
    20  	messageRouting   = "MESSAGE_ROUTING"
    21  	sharedKey        = "DATA_SYNC_SHARED_KEY"
    22  	secretKey        = "DATA_SYNC_SECRET_KEY"
    23  	appKey           = "DATA_SYNC_APP_KEY"
    24  	targetSettings   = "TARGET_SETTINGS"
    25  	organizationID   = "ORGANIZATION_ID"
    26  	organizationName = "ORGANIZATION_NAME"
    27  	siteID           = "SITE_ID"
    28  	siteName         = "SITE_NAME"
    29  	target           = "TARGET"
    30  )
    31  
    32  // Config TODO: https://github.com/ncr-swt-retail/edge-roadmap/issues/10011
    33  type Config struct {
    34  	GrpcPort             string
    35  	MsgPort              string
    36  	LivenessPort         string
    37  	ReadinessPort        string
    38  	PrometheusPort       string
    39  	RunMessagingService  bool
    40  	KafkaProducerTimeout int
    41  	WorkerPolling        int
    42  	RunMessageWorker     bool
    43  	MessageRouting       string
    44  	Partition            int
    45  	OutboxPath           string
    46  }
    47  
    48  type DSConfig struct {
    49  	OrganizationID   string
    50  	OrganizationName string
    51  	SiteID           string
    52  	SiteName         string
    53  }
    54  
    55  var cfg DSConfig
    56  
    57  func (c *Config) BindFlags(fs *flag.FlagSet) {
    58  	fs.StringVar(
    59  		&c.GrpcPort,
    60  		"grpc-port",
    61  		"",
    62  		"GRPC Port Value")
    63  
    64  	fs.StringVar(
    65  		&c.MsgPort,
    66  		"msg-service-port",
    67  		"",
    68  		"Message Service Port")
    69  
    70  	fs.StringVar(
    71  		&c.LivenessPort,
    72  		"liveness-port",
    73  		"",
    74  		"Liveness Port")
    75  
    76  	fs.StringVar(
    77  		&c.ReadinessPort,
    78  		"readiness-port",
    79  		"",
    80  		"Readiness Port")
    81  
    82  	fs.StringVar(
    83  		&c.PrometheusPort,
    84  		"prometheus-port",
    85  		"",
    86  		"Prometheus Port")
    87  
    88  	fs.BoolVar(
    89  		&c.RunMessagingService,
    90  		"run-messaging-service",
    91  		true,
    92  		"Flag for running messaging service")
    93  
    94  	fs.IntVar(
    95  		&c.KafkaProducerTimeout,
    96  		"kafka-producer-timeout-ms",
    97  		60000,
    98  		"Value for the Kafka Producer Timeout in ms")
    99  
   100  	fs.IntVar(
   101  		&c.WorkerPolling,
   102  		"worker-polling",
   103  		5,
   104  		"Value for the Worker Polling")
   105  
   106  	fs.BoolVar(
   107  		&c.RunMessageWorker,
   108  		"run-message-worker",
   109  		true,
   110  		"Flag for running message worker")
   111  
   112  	fs.IntVar(
   113  		&c.Partition,
   114  		"partition",
   115  		5,
   116  		"Partition Value for Chirp Config")
   117  
   118  	fs.StringVar(
   119  		&c.MessageRouting,
   120  		"message-routing",
   121  		"",
   122  		"Value for the Message Routing")
   123  
   124  	fs.StringVar(
   125  		&c.OutboxPath,
   126  		"outbox-path",
   127  		"",
   128  		"Value for the Outbox Path")
   129  }
   130  
   131  type MessageRoute struct {
   132  	Type        string
   133  	Route       string
   134  	Topic       string
   135  	BulkSize    int
   136  	Compression bool
   137  	Signing     bool
   138  }
   139  
   140  type Message struct {
   141  	Type        string `json:"type"`
   142  	Compression bool   `json:"compression"`
   143  	Signing     bool   `json:"signing"`
   144  	Route       string `json:"route"`
   145  }
   146  type MessageRouteConfig struct {
   147  	Routes   []Route   `json:"routes"`
   148  	Messages []Message `json:"messages"`
   149  }
   150  type Route struct {
   151  	ID       string `json:"id"`
   152  	Topic    string `json:"topic"`
   153  	BulkSize int    `json:"bulk_size"`
   154  }
   155  type KafkaSSL struct {
   156  	KeystoreLocation        string
   157  	KeystorePassword        string
   158  	CaLocation              string
   159  	EnableCertVerification  string
   160  	IdentificationAlgorithm string
   161  }
   162  
   163  func (c *Config) Validate() error {
   164  	if c.GrpcPort == "" {
   165  		return fmt.Errorf("gRPC port cannot be empty")
   166  	}
   167  
   168  	if c.MsgPort == "" {
   169  		return fmt.Errorf("message service port cannot be empty")
   170  	}
   171  
   172  	if c.LivenessPort == "" {
   173  		return fmt.Errorf("liveness probe cannot be empty")
   174  	}
   175  
   176  	if c.ReadinessPort == "" {
   177  		return fmt.Errorf("readiness probe cannot be empty")
   178  	}
   179  
   180  	if c.PrometheusPort == "" {
   181  		return fmt.Errorf("prometheus port cannot be empty")
   182  	}
   183  
   184  	if c.KafkaProducerTimeout == 0 {
   185  		return fmt.Errorf("kafka producer timeout cannot be empty")
   186  	}
   187  
   188  	if c.WorkerPolling == 0 {
   189  		return fmt.Errorf("worker polling cannot be empty")
   190  	}
   191  	if c.Partition == 0 {
   192  		return fmt.Errorf("partition cannot be empty")
   193  	}
   194  
   195  	if c.MessageRouting == "" {
   196  		return fmt.Errorf("message routing cannot be empty")
   197  	}
   198  
   199  	if c.OutboxPath == "" {
   200  		return fmt.Errorf("outbox-path cannot be empty")
   201  	}
   202  	return nil
   203  }
   204  
   205  func NewConfig() (*Config, error) {
   206  	cfg := &Config{}
   207  
   208  	fs := flag.NewFlagSet("internal", flag.ExitOnError)
   209  
   210  	cfg.BindFlags(fs)
   211  
   212  	if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil {
   213  		return cfg, err
   214  	}
   215  
   216  	if err := cfg.Validate(); err != nil {
   217  		return cfg, err
   218  	}
   219  
   220  	return cfg, nil
   221  }
   222  
   223  func ValidateConfiguration() error {
   224  	result := make([]string, 0)
   225  
   226  	cfg.OrganizationID = os.Getenv(organizationID)
   227  	cfg.OrganizationName = os.Getenv(organizationName)
   228  	cfg.SiteID = os.Getenv(siteID)
   229  	cfg.SiteName = os.Getenv(siteName)
   230  
   231  	if cfg.OrganizationID == "" || cfg.OrganizationName == "" || cfg.SiteID == "" || cfg.SiteName == "" {
   232  		result = append(result, organizationID, organizationName, siteID, siteName)
   233  	}
   234  
   235  	if GetTarget() == "kafka" {
   236  		_, err := GetKafkaSettings()
   237  		if err != nil {
   238  			result = append(result, targetSettings)
   239  		}
   240  	}
   241  
   242  	if GetTarget() == "pubsub" {
   243  		_, err := GetPubSubSettings()
   244  		if err != nil {
   245  			result = append(result, targetSettings)
   246  		}
   247  	}
   248  
   249  	if len(result) > 0 {
   250  		return fmt.Errorf("missing the following environment variables: %v", strings.Join(result, ", "))
   251  	}
   252  
   253  	return nil
   254  }
   255  
   256  func GetRouteConfig(messageType string, chirpConfig *Config) *MessageRoute {
   257  	messageRouteMap := getMessageRoutePerMessageType(chirpConfig)
   258  
   259  	elem, ok := messageRouteMap[messageType]
   260  	if !ok {
   261  		r := messageRouteMap["*"]
   262  		r.Type = messageType
   263  		return r
   264  	}
   265  
   266  	return elem
   267  }
   268  
   269  func GetRoutes(chirpConfig *Config) []string {
   270  	var result []string
   271  
   272  	configs, err := getRouteConfigs(chirpConfig)
   273  	if err != nil {
   274  		return result
   275  	}
   276  
   277  	for i := 0; i < len(configs.Routes); i++ {
   278  		result = append(result, configs.Routes[i].ID)
   279  	}
   280  
   281  	return result
   282  }
   283  
   284  func GetBulkSizes(chirpConfig *Config) map[string]int {
   285  	bulkSizes := make(map[string]int)
   286  
   287  	configs, err := getRouteConfigs(chirpConfig)
   288  	if err != nil {
   289  		return bulkSizes
   290  	}
   291  
   292  	for i := 0; i < len(configs.Routes); i++ {
   293  		bulkSizes[configs.Routes[i].ID] = configs.Routes[i].BulkSize
   294  	}
   295  
   296  	return bulkSizes
   297  }
   298  
   299  func GetTopics(chirpConfig *Config) map[string]string {
   300  	topics := make(map[string]string)
   301  
   302  	configs, err := getRouteConfigs(chirpConfig)
   303  	if err != nil {
   304  		return topics
   305  	}
   306  
   307  	for i := 0; i < len(configs.Routes); i++ {
   308  		topics[configs.Routes[i].ID] = configs.Routes[i].Topic
   309  	}
   310  
   311  	return topics
   312  }
   313  
   314  func GetOutboxPath(chirpConfig *Config) string {
   315  	o := chirpConfig.OutboxPath
   316  
   317  	// calling code will assume no trailing slash in this path
   318  	if l := len(o) - 1; l >= 0 && o[l] == '/' {
   319  		o = o[:l]
   320  	}
   321  
   322  	return o
   323  }
   324  
   325  func ShouldCompressMessage(messageType string, chirpConfig *Config) bool {
   326  	cfg := GetRouteConfig(messageType, chirpConfig)
   327  	return cfg.Compression
   328  }
   329  
   330  func ShouldSignMessage(messageType string, chirpConfig *Config) bool {
   331  	cfg := GetRouteConfig(messageType, chirpConfig)
   332  	return cfg.Signing
   333  }
   334  
   335  func GetSharedKey() string {
   336  	return os.Getenv(sharedKey)
   337  }
   338  
   339  func GetSecretKey() string {
   340  	return os.Getenv(secretKey)
   341  }
   342  
   343  func GetAppKey() string {
   344  	return os.Getenv(appKey)
   345  }
   346  
   347  func GetTarget() string {
   348  	t := os.Getenv(target)
   349  	if t == "" {
   350  		return "kafka"
   351  	}
   352  
   353  	return t
   354  }
   355  
   356  func GetKafkaSettings() (KafkaSettings, error) {
   357  	var settings KafkaSettings
   358  
   359  	err := json.Unmarshal([]byte(os.Getenv("TARGET_SETTINGS")), &settings)
   360  	if err != nil {
   361  		return settings, fmt.Errorf("invalid Kafka TARGET_SETTINGS env variable")
   362  	}
   363  
   364  	return settings, nil
   365  }
   366  
   367  func GetPubSubSettings() (PubSubSettings, error) {
   368  	var settings PubSubSettings
   369  
   370  	err := json.Unmarshal([]byte(os.Getenv("TARGET_SETTINGS")), &settings)
   371  	if err != nil {
   372  		return settings, fmt.Errorf("invalid PubSubSettings TARGET_SETTINGS env variable")
   373  	}
   374  
   375  	return settings, nil
   376  }
   377  
   378  type KafkaSettings struct {
   379  	KafkaEndpoint           string `json:"kafka_endpoint"`
   380  	MessageMaxBytes         int32  `json:"message_max_bytes"`
   381  	SecurityProtocol        string `json:"security_protocol"`
   382  	KeystoreLocation        string `json:"keystore_location"`
   383  	KeystorePassword        string `json:"keystore_password"`
   384  	CaLocation              string `json:"ssl_ca_location"`
   385  	EnableCertVerification  bool   `json:"ssl_enable_cert_identification"`
   386  	IdentificationAlgorithm string `json:"ssl_endpoint_identification_algorithm"`
   387  }
   388  
   389  type PubSubSettings struct {
   390  	ProjectID string `json:"project_id"`
   391  }
   392  
   393  func getRoutesMap(chirpConfig *Config) map[string]Route {
   394  	result := make(map[string]Route)
   395  
   396  	routeConfigs, _ := getRouteConfigs(chirpConfig)
   397  
   398  	for _, route := range routeConfigs.Routes {
   399  		result[route.ID] = route
   400  	}
   401  
   402  	return result
   403  }
   404  
   405  func getMessageRoutePerMessageType(chirpConfig *Config) map[string]*MessageRoute {
   406  	result := make(map[string]*MessageRoute)
   407  
   408  	routeConfigs, _ := getRouteConfigs(chirpConfig)
   409  	routesMap := getRoutesMap(chirpConfig)
   410  
   411  	var messageRoute *MessageRoute
   412  	for _, message := range routeConfigs.Messages {
   413  		route := routesMap[message.Route]
   414  
   415  		messageRoute = &MessageRoute{
   416  			Type:        message.Type,
   417  			Route:       message.Route,
   418  			Compression: message.Compression,
   419  			Signing:     message.Signing,
   420  			Topic:       route.Topic,
   421  			BulkSize:    route.BulkSize,
   422  		}
   423  
   424  		result[message.Type] = messageRoute
   425  	}
   426  
   427  	return result
   428  }
   429  
   430  func getRouteConfigs(chirpConfig *Config) (MessageRouteConfig, error) {
   431  	var result MessageRouteConfig
   432  	err := json.Unmarshal([]byte(chirpConfig.MessageRouting), &result)
   433  	if err != nil {
   434  		return result, errors.New("invalid MESSAGE_ROUTING environment variable")
   435  	}
   436  	return result, nil
   437  }
   438  
   439  func OrganizationName() string {
   440  	return cfg.OrganizationName
   441  }
   442  
   443  func OrganizationID() string {
   444  	return cfg.OrganizationID
   445  }
   446  
   447  func SiteID() string {
   448  	return cfg.SiteID
   449  }
   450  
   451  func SiteName() string {
   452  	return cfg.SiteName
   453  }
   454  
   455  func GetKafkaClientConfig(chirpConfig *Config) (*kafkaclient.Config, error) {
   456  	settings, err := GetKafkaSettings()
   457  	if err != nil {
   458  		return nil, err
   459  	}
   460  	produceTimeOut := time.Duration(chirpConfig.KafkaProducerTimeout) * time.Millisecond
   461  	return &kafkaclient.Config{
   462  		Brokers:         []string{settings.KafkaEndpoint},
   463  		MessageMaxBytes: settings.MessageMaxBytes,
   464  		ProduceTimeout:  produceTimeOut,
   465  
   466  		GroupID: "data-sync___",
   467  	}, nil
   468  }
   469  

View as plain text