...
1 package kafkaclient
2
3 import (
4 "errors"
5 "fmt"
6 "time"
7 )
8
9 var (
10 errBrokerInvalid = errors.New("no value provided for broker")
11 errTopicInvalid = errors.New("no value provided for topic")
12 )
13
14 type Config struct {
15
16 Brokers []string
17 TLS struct{}
18
19
20
21 Topic string
22
23
24 MessageMaxBytes int32
25 ProduceTimeout time.Duration
26
27
28 GroupID string
29 BulkSize int
30 ReadTimeout time.Duration
31 ReadWindowTimeout time.Duration
32 ReturnOnEmptyFetch bool
33 }
34
35 func (c Config) ValidForAdmin() error {
36 if len(c.Brokers) == 0 {
37 return errBrokerInvalid
38 }
39 return nil
40 }
41
42 func (c Config) ValidForProducer() error {
43 if len(c.Brokers) == 0 {
44 return errBrokerInvalid
45 }
46 if c.MessageMaxBytes == 0 {
47 return fmt.Errorf("no value provided for MessageMaxBytes")
48 }
49 if c.ProduceTimeout == 0 {
50 return fmt.Errorf("no value provided for produce time out")
51 }
52 return nil
53 }
54
55 func (c Config) ValidForConsumer() error {
56 if len(c.Brokers) == 0 {
57 return errBrokerInvalid
58 }
59 if c.Topic == "" {
60 return errTopicInvalid
61 }
62
63 if c.GroupID == "" {
64 return fmt.Errorf("no value provided for consumer group")
65 }
66 if c.BulkSize == 0 {
67 return fmt.Errorf("no value provided for consumer read bulk size")
68 }
69 if c.ReadTimeout == 0 {
70 return fmt.Errorf("no value provided for consumer read time out")
71 }
72 if c.ReadWindowTimeout == 0 {
73 return fmt.Errorf("no value provided for consumer read window timeout")
74 }
75 return nil
76 }
77
View as plain text