package kafkaclient import ( "errors" "fmt" "time" ) var ( errBrokerInvalid = errors.New("no value provided for broker") errTopicInvalid = errors.New("no value provided for topic") ) type Config struct { // Admin, Consumer, Producer Brokers []string TLS struct{} // tls.Config TODO configuration not used // Required for Consumer, // Optional for Producer, can specify when producing message Topic string // Producer MessageMaxBytes int32 ProduceTimeout time.Duration // Consumer GroupID string BulkSize int ReadTimeout time.Duration ReadWindowTimeout time.Duration ReturnOnEmptyFetch bool // return immediately if no record is fetched } func (c Config) ValidForAdmin() error { if len(c.Brokers) == 0 { return errBrokerInvalid } return nil } func (c Config) ValidForProducer() error { if len(c.Brokers) == 0 { return errBrokerInvalid } if c.MessageMaxBytes == 0 { return fmt.Errorf("no value provided for MessageMaxBytes") } if c.ProduceTimeout == 0 { return fmt.Errorf("no value provided for produce time out") } return nil } func (c Config) ValidForConsumer() error { if len(c.Brokers) == 0 { return errBrokerInvalid } if c.Topic == "" { return errTopicInvalid } if c.GroupID == "" { return fmt.Errorf("no value provided for consumer group") } if c.BulkSize == 0 { return fmt.Errorf("no value provided for consumer read bulk size") } if c.ReadTimeout == 0 { return fmt.Errorf("no value provided for consumer read time out") } if c.ReadWindowTimeout == 0 { return fmt.Errorf("no value provided for consumer read window timeout") } return nil }