package kafkaclient import ( "context" "fmt" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kversion" ) var _ Producer = producer{} type producer struct { cfg *Config client *kgo.Client } // NewProducer sends messages to kafka at record level or per DefaultProduceTopic // Topic is automatically created // https://github.com/twmb/franz-go/blob/master/docs/producing-and-consuming.md#offset-management func NewProducer(cfg *Config) (Producer, error) { if err := cfg.ValidForProducer(); err != nil { return nil, err } opt := []kgo.Opt{kgo.SeedBrokers(cfg.Brokers...), kgo.ProducerBatchMaxBytes(cfg.MessageMaxBytes), kgo.AllowAutoTopicCreation(), kgo.MinVersions(kversion.V2_1_0()), } if cfg.Topic != "" { opt = append(opt, kgo.DefaultProduceTopic(cfg.Topic)) } client, err := kgo.NewClient(opt...) if err != nil { return nil, err } return &producer{ cfg: cfg, client: client, }, nil } // Ping checks is broker is available func (p producer) Ping(ctx context.Context) error { return p.client.Ping(ctx) } // Produce blocks and waits for broker to be available // if context is nil Config.ProduceTimeout will be used. func (p producer) Produce(ctx context.Context, records ...*kgo.Record) error { if len(records) == 0 { return fmt.Errorf("producer: no records to produce") } record := records[0] if record.Topic == "" { return fmt.Errorf("producer: no topic specified: %w", errTopicInvalid) } var cancel func() if ctx == nil { ctx, cancel = context.WithTimeout(context.Background(), p.cfg.ProduceTimeout) defer cancel() } res := p.client.ProduceSync(ctx, records...) if err := res.FirstErr(); err != nil { return fmt.Errorf("producer: fail to produce message: %w", err) } return nil } // ReProduce publish a Record to a default topic for re processing // if context is nil Config.ProduceTimeout will be used. func (p producer) ReProduce(ctx context.Context, records ...*kgo.Record) error { if len(records) == 0 { return fmt.Errorf("reproduce: no records to produce") } if p.cfg.Topic == "" { return fmt.Errorf("reproduce: no topic specified: %w", errTopicInvalid) } records = p.copy(records) var cancel func() if ctx == nil { ctx, cancel = context.WithTimeout(context.Background(), p.cfg.ProduceTimeout) defer cancel() } res := p.client.ProduceSync(ctx, records...) if err := res.FirstErr(); err != nil { return fmt.Errorf("reproduce: fail to produce message: %w", err) } return nil } // Stop close the connection to the broker func (p producer) Stop() { p.client.Close() } func (p producer) copy(records []*kgo.Record) []*kgo.Record { l := len(records) ret := make([]*kgo.Record, l) topic := p.cfg.Topic for i := 0; i < l; i++ { ret[i] = copyRecord(records[i], topic) } return ret } func copyRecord(record *kgo.Record, topic string) *kgo.Record { headers := make([]kgo.RecordHeader, len(record.Headers)) copy(headers, record.Headers) return &kgo.Record{ Topic: topic, Key: record.Key, Value: record.Value, Headers: headers, } }