...
1 package kafkaclient
2
3 import (
4 "context"
5 "fmt"
6
7 "github.com/twmb/franz-go/pkg/kgo"
8 "github.com/twmb/franz-go/pkg/kversion"
9 )
10
11 var _ Producer = producer{}
12
13 type producer struct {
14 cfg *Config
15 client *kgo.Client
16 }
17
18
19
20
21 func NewProducer(cfg *Config) (Producer, error) {
22 if err := cfg.ValidForProducer(); err != nil {
23 return nil, err
24 }
25 opt := []kgo.Opt{kgo.SeedBrokers(cfg.Brokers...),
26 kgo.ProducerBatchMaxBytes(cfg.MessageMaxBytes),
27 kgo.AllowAutoTopicCreation(),
28 kgo.MinVersions(kversion.V2_1_0()),
29 }
30 if cfg.Topic != "" {
31 opt = append(opt, kgo.DefaultProduceTopic(cfg.Topic))
32 }
33 client, err := kgo.NewClient(opt...)
34 if err != nil {
35 return nil, err
36 }
37 return &producer{
38 cfg: cfg,
39 client: client,
40 }, nil
41 }
42
43
44 func (p producer) Ping(ctx context.Context) error {
45 return p.client.Ping(ctx)
46 }
47
48
49
50 func (p producer) Produce(ctx context.Context, records ...*kgo.Record) error {
51 if len(records) == 0 {
52 return fmt.Errorf("producer: no records to produce")
53 }
54 record := records[0]
55 if record.Topic == "" {
56 return fmt.Errorf("producer: no topic specified: %w", errTopicInvalid)
57 }
58 var cancel func()
59 if ctx == nil {
60 ctx, cancel = context.WithTimeout(context.Background(), p.cfg.ProduceTimeout)
61 defer cancel()
62 }
63 res := p.client.ProduceSync(ctx, records...)
64 if err := res.FirstErr(); err != nil {
65 return fmt.Errorf("producer: fail to produce message: %w", err)
66 }
67 return nil
68 }
69
70
71
72 func (p producer) ReProduce(ctx context.Context, records ...*kgo.Record) error {
73 if len(records) == 0 {
74 return fmt.Errorf("reproduce: no records to produce")
75 }
76 if p.cfg.Topic == "" {
77 return fmt.Errorf("reproduce: no topic specified: %w", errTopicInvalid)
78 }
79 records = p.copy(records)
80 var cancel func()
81 if ctx == nil {
82 ctx, cancel = context.WithTimeout(context.Background(), p.cfg.ProduceTimeout)
83 defer cancel()
84 }
85 res := p.client.ProduceSync(ctx, records...)
86 if err := res.FirstErr(); err != nil {
87 return fmt.Errorf("reproduce: fail to produce message: %w", err)
88 }
89 return nil
90 }
91
92
93 func (p producer) Stop() {
94 p.client.Close()
95 }
96
97 func (p producer) copy(records []*kgo.Record) []*kgo.Record {
98 l := len(records)
99 ret := make([]*kgo.Record, l)
100 topic := p.cfg.Topic
101 for i := 0; i < l; i++ {
102 ret[i] = copyRecord(records[i], topic)
103 }
104 return ret
105 }
106
107 func copyRecord(record *kgo.Record, topic string) *kgo.Record {
108 headers := make([]kgo.RecordHeader, len(record.Headers))
109 copy(headers, record.Headers)
110 return &kgo.Record{
111 Topic: topic,
112 Key: record.Key,
113 Value: record.Value,
114 Headers: headers,
115 }
116 }
117
View as plain text