...
1 package kafkaclient
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7
8 "github.com/twmb/franz-go/pkg/kgo"
9 "github.com/twmb/franz-go/pkg/kversion"
10 )
11
12 var _ Consumer = consumer{}
13
14 type consumer struct {
15 cfg *Config
16 client *kgo.Client
17 }
18
19
20
21 func NewConsumer(cfg *Config) (Consumer, error) {
22 if err := cfg.ValidForConsumer(); err != nil {
23 return nil, err
24 }
25 client, err := kgo.NewClient(
26 kgo.SeedBrokers(cfg.Brokers...),
27 kgo.ConsumerGroup(fmt.Sprintf("%s%s", cfg.GroupID, cfg.Topic)),
28 kgo.ConsumeTopics(cfg.Topic),
29 kgo.DisableAutoCommit(),
30 kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
31 kgo.MinVersions(kversion.V2_1_0()),
32 kgo.AllowAutoTopicCreation(),
33 kgo.BlockRebalanceOnPoll(),
34 )
35 if err != nil {
36 return nil, err
37 }
38 return &consumer{
39 cfg: cfg,
40 client: client,
41 }, nil
42 }
43
44
45 func (c consumer) Ping(ctx context.Context) error {
46 return c.client.Ping(ctx)
47 }
48
49
50 func (c consumer) Read(ctx context.Context) ([]*kgo.Record, error) {
51 if ctx == nil {
52 ctx = context.Background()
53 }
54 ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
55 defer cancel()
56
57 windowCtx, windowCancel := context.WithTimeout(context.Background(), c.cfg.ReadWindowTimeout)
58 defer windowCancel()
59
60 var records []*kgo.Record
61 defer c.client.AllowRebalance()
62 loop:
63 for {
64 select {
65 default:
66 fetches := c.client.PollFetches(ctx)
67 if fetches.IsClientClosed() {
68 return records, nil
69 }
70 if err := fetches.Err(); err != nil && !errors.Is(err, context.DeadlineExceeded) {
71 return nil, err
72 }
73 if c.cfg.ReturnOnEmptyFetch && fetches.NumRecords() == 0 {
74 return records, nil
75 }
76 records = append(records, fetches.Records()...)
77 if len(records) > c.cfg.BulkSize {
78 break loop
79 }
80 case <-windowCtx.Done():
81 break loop
82 }
83 }
84 return records, nil
85 }
86
87
88 func (c consumer) Ack(ctx context.Context, records ...*kgo.Record) error {
89
90 return c.client.CommitRecords(ctx, records...)
91 }
92
93
94 func (c consumer) Stop() {
95 c.client.CloseAllowingRebalance()
96 }
97
View as plain text