...

Source file src/edge-infra.dev/pkg/edge/datasync/kafkaclient/producer.go

Documentation: edge-infra.dev/pkg/edge/datasync/kafkaclient

     1  package kafkaclient
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/twmb/franz-go/pkg/kgo"
     8  	"github.com/twmb/franz-go/pkg/kversion"
     9  )
    10  
    11  var _ Producer = producer{}
    12  
    13  type producer struct {
    14  	cfg    *Config
    15  	client *kgo.Client
    16  }
    17  
    18  // NewProducer sends messages to kafka at record level or per DefaultProduceTopic
    19  // Topic is automatically created
    20  // https://github.com/twmb/franz-go/blob/master/docs/producing-and-consuming.md#offset-management
    21  func NewProducer(cfg *Config) (Producer, error) {
    22  	if err := cfg.ValidForProducer(); err != nil {
    23  		return nil, err
    24  	}
    25  	opt := []kgo.Opt{kgo.SeedBrokers(cfg.Brokers...),
    26  		kgo.ProducerBatchMaxBytes(cfg.MessageMaxBytes),
    27  		kgo.AllowAutoTopicCreation(),
    28  		kgo.MinVersions(kversion.V2_1_0()),
    29  	}
    30  	if cfg.Topic != "" {
    31  		opt = append(opt, kgo.DefaultProduceTopic(cfg.Topic))
    32  	}
    33  	client, err := kgo.NewClient(opt...)
    34  	if err != nil {
    35  		return nil, err
    36  	}
    37  	return &producer{
    38  		cfg:    cfg,
    39  		client: client,
    40  	}, nil
    41  }
    42  
    43  // Ping checks is broker is available
    44  func (p producer) Ping(ctx context.Context) error {
    45  	return p.client.Ping(ctx)
    46  }
    47  
    48  // Produce blocks and waits for broker to be available
    49  // if context is nil Config.ProduceTimeout will be used.
    50  func (p producer) Produce(ctx context.Context, records ...*kgo.Record) error {
    51  	if len(records) == 0 {
    52  		return fmt.Errorf("producer: no records to produce")
    53  	}
    54  	record := records[0]
    55  	if record.Topic == "" {
    56  		return fmt.Errorf("producer: no topic specified: %w", errTopicInvalid)
    57  	}
    58  	var cancel func()
    59  	if ctx == nil {
    60  		ctx, cancel = context.WithTimeout(context.Background(), p.cfg.ProduceTimeout)
    61  		defer cancel()
    62  	}
    63  	res := p.client.ProduceSync(ctx, records...)
    64  	if err := res.FirstErr(); err != nil {
    65  		return fmt.Errorf("producer: fail to produce message: %w", err)
    66  	}
    67  	return nil
    68  }
    69  
    70  // ReProduce publish a Record to a default topic for re processing
    71  // if context is nil Config.ProduceTimeout will be used.
    72  func (p producer) ReProduce(ctx context.Context, records ...*kgo.Record) error {
    73  	if len(records) == 0 {
    74  		return fmt.Errorf("reproduce: no records to produce")
    75  	}
    76  	if p.cfg.Topic == "" {
    77  		return fmt.Errorf("reproduce: no topic specified: %w", errTopicInvalid)
    78  	}
    79  	records = p.copy(records)
    80  	var cancel func()
    81  	if ctx == nil {
    82  		ctx, cancel = context.WithTimeout(context.Background(), p.cfg.ProduceTimeout)
    83  		defer cancel()
    84  	}
    85  	res := p.client.ProduceSync(ctx, records...)
    86  	if err := res.FirstErr(); err != nil {
    87  		return fmt.Errorf("reproduce: fail to produce message: %w", err)
    88  	}
    89  	return nil
    90  }
    91  
    92  // Stop close the connection to the broker
    93  func (p producer) Stop() {
    94  	p.client.Close()
    95  }
    96  
    97  func (p producer) copy(records []*kgo.Record) []*kgo.Record {
    98  	l := len(records)
    99  	ret := make([]*kgo.Record, l)
   100  	topic := p.cfg.Topic
   101  	for i := 0; i < l; i++ {
   102  		ret[i] = copyRecord(records[i], topic)
   103  	}
   104  	return ret
   105  }
   106  
   107  func copyRecord(record *kgo.Record, topic string) *kgo.Record {
   108  	headers := make([]kgo.RecordHeader, len(record.Headers))
   109  	copy(headers, record.Headers)
   110  	return &kgo.Record{
   111  		Topic:   topic,
   112  		Key:     record.Key,
   113  		Value:   record.Value,
   114  		Headers: headers,
   115  	}
   116  }
   117  

View as plain text