...

Source file src/edge-infra.dev/pkg/edge/datasync/kafkaclient/config.go

Documentation: edge-infra.dev/pkg/edge/datasync/kafkaclient

     1  package kafkaclient
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"time"
     7  )
     8  
     9  var (
    10  	errBrokerInvalid = errors.New("no value provided for broker")
    11  	errTopicInvalid  = errors.New("no value provided for topic")
    12  )
    13  
    14  type Config struct {
    15  	// Admin, Consumer, Producer
    16  	Brokers []string
    17  	TLS     struct{} // tls.Config TODO configuration not used
    18  
    19  	// Required for Consumer,
    20  	// Optional for Producer, can specify when producing message
    21  	Topic string
    22  
    23  	// Producer
    24  	MessageMaxBytes int32
    25  	ProduceTimeout  time.Duration
    26  
    27  	// Consumer
    28  	GroupID            string
    29  	BulkSize           int
    30  	ReadTimeout        time.Duration
    31  	ReadWindowTimeout  time.Duration
    32  	ReturnOnEmptyFetch bool // return immediately if no record is fetched
    33  }
    34  
    35  func (c Config) ValidForAdmin() error {
    36  	if len(c.Brokers) == 0 {
    37  		return errBrokerInvalid
    38  	}
    39  	return nil
    40  }
    41  
    42  func (c Config) ValidForProducer() error {
    43  	if len(c.Brokers) == 0 {
    44  		return errBrokerInvalid
    45  	}
    46  	if c.MessageMaxBytes == 0 {
    47  		return fmt.Errorf("no value provided for MessageMaxBytes")
    48  	}
    49  	if c.ProduceTimeout == 0 {
    50  		return fmt.Errorf("no value provided for produce time out")
    51  	}
    52  	return nil
    53  }
    54  
    55  func (c Config) ValidForConsumer() error {
    56  	if len(c.Brokers) == 0 {
    57  		return errBrokerInvalid
    58  	}
    59  	if c.Topic == "" {
    60  		return errTopicInvalid
    61  	}
    62  
    63  	if c.GroupID == "" {
    64  		return fmt.Errorf("no value provided for consumer group")
    65  	}
    66  	if c.BulkSize == 0 {
    67  		return fmt.Errorf("no value provided for consumer read bulk size")
    68  	}
    69  	if c.ReadTimeout == 0 {
    70  		return fmt.Errorf("no value provided for consumer read time out")
    71  	}
    72  	if c.ReadWindowTimeout == 0 {
    73  		return fmt.Errorf("no value provided for consumer read window timeout")
    74  	}
    75  	return nil
    76  }
    77  

View as plain text