package kafkaclient import ( "context" "errors" "fmt" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kversion" ) var _ Consumer = consumer{} type consumer struct { cfg *Config client *kgo.Client } // NewConsumer creates topic if not available // Topic is automatically created func NewConsumer(cfg *Config) (Consumer, error) { if err := cfg.ValidForConsumer(); err != nil { return nil, err } client, err := kgo.NewClient( kgo.SeedBrokers(cfg.Brokers...), kgo.ConsumerGroup(fmt.Sprintf("%s%s", cfg.GroupID, cfg.Topic)), kgo.ConsumeTopics(cfg.Topic), kgo.DisableAutoCommit(), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.MinVersions(kversion.V2_1_0()), kgo.AllowAutoTopicCreation(), kgo.BlockRebalanceOnPoll(), ) if err != nil { return nil, err } return &consumer{ cfg: cfg, client: client, }, nil } // Ping checks is broker is available func (c consumer) Ping(ctx context.Context) error { return c.client.Ping(ctx) } // Read all messages based off ReadTimeout and ReadWindowTimeout func (c consumer) Read(ctx context.Context) ([]*kgo.Record, error) { if ctx == nil { ctx = context.Background() } ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout) defer cancel() windowCtx, windowCancel := context.WithTimeout(context.Background(), c.cfg.ReadWindowTimeout) defer windowCancel() var records []*kgo.Record defer c.client.AllowRebalance() // we have just one broker so this should not matter. loop: for { select { default: fetches := c.client.PollFetches(ctx) if fetches.IsClientClosed() { return records, nil } if err := fetches.Err(); err != nil && !errors.Is(err, context.DeadlineExceeded) { return nil, err } if c.cfg.ReturnOnEmptyFetch && fetches.NumRecords() == 0 { return records, nil } records = append(records, fetches.Records()...) if len(records) > c.cfg.BulkSize { break loop } case <-windowCtx.Done(): break loop } } return records, nil } // Ack commits the given fetched records func (c consumer) Ack(ctx context.Context, records ...*kgo.Record) error { // CommitUncommittedOffsets acks all un-commit records return c.client.CommitRecords(ctx, records...) } // Stop close the connection to the broker func (c consumer) Stop() { c.client.CloseAllowingRebalance() }