...

Source file src/edge-infra.dev/pkg/edge/datasync/kafkaclient/consumer.go

Documentation: edge-infra.dev/pkg/edge/datasync/kafkaclient

     1  package kafkaclient
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  
     8  	"github.com/twmb/franz-go/pkg/kgo"
     9  	"github.com/twmb/franz-go/pkg/kversion"
    10  )
    11  
    12  var _ Consumer = consumer{}
    13  
    14  type consumer struct {
    15  	cfg    *Config
    16  	client *kgo.Client
    17  }
    18  
    19  // NewConsumer creates topic if not available
    20  // Topic is automatically created
    21  func NewConsumer(cfg *Config) (Consumer, error) {
    22  	if err := cfg.ValidForConsumer(); err != nil {
    23  		return nil, err
    24  	}
    25  	client, err := kgo.NewClient(
    26  		kgo.SeedBrokers(cfg.Brokers...),
    27  		kgo.ConsumerGroup(fmt.Sprintf("%s%s", cfg.GroupID, cfg.Topic)),
    28  		kgo.ConsumeTopics(cfg.Topic),
    29  		kgo.DisableAutoCommit(),
    30  		kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
    31  		kgo.MinVersions(kversion.V2_1_0()),
    32  		kgo.AllowAutoTopicCreation(),
    33  		kgo.BlockRebalanceOnPoll(),
    34  	)
    35  	if err != nil {
    36  		return nil, err
    37  	}
    38  	return &consumer{
    39  		cfg:    cfg,
    40  		client: client,
    41  	}, nil
    42  }
    43  
    44  // Ping checks is broker is available
    45  func (c consumer) Ping(ctx context.Context) error {
    46  	return c.client.Ping(ctx)
    47  }
    48  
    49  // Read all messages based off ReadTimeout and ReadWindowTimeout
    50  func (c consumer) Read(ctx context.Context) ([]*kgo.Record, error) {
    51  	if ctx == nil {
    52  		ctx = context.Background()
    53  	}
    54  	ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
    55  	defer cancel()
    56  
    57  	windowCtx, windowCancel := context.WithTimeout(context.Background(), c.cfg.ReadWindowTimeout)
    58  	defer windowCancel()
    59  
    60  	var records []*kgo.Record
    61  	defer c.client.AllowRebalance() // we have just one broker so this should not matter.
    62  loop:
    63  	for {
    64  		select {
    65  		default:
    66  			fetches := c.client.PollFetches(ctx)
    67  			if fetches.IsClientClosed() {
    68  				return records, nil
    69  			}
    70  			if err := fetches.Err(); err != nil && !errors.Is(err, context.DeadlineExceeded) {
    71  				return nil, err
    72  			}
    73  			if c.cfg.ReturnOnEmptyFetch && fetches.NumRecords() == 0 {
    74  				return records, nil
    75  			}
    76  			records = append(records, fetches.Records()...)
    77  			if len(records) > c.cfg.BulkSize {
    78  				break loop
    79  			}
    80  		case <-windowCtx.Done():
    81  			break loop
    82  		}
    83  	}
    84  	return records, nil
    85  }
    86  
    87  // Ack commits the given fetched records
    88  func (c consumer) Ack(ctx context.Context, records ...*kgo.Record) error {
    89  	// CommitUncommittedOffsets acks all un-commit records
    90  	return c.client.CommitRecords(ctx, records...)
    91  }
    92  
    93  // Stop close the connection to the broker
    94  func (c consumer) Stop() {
    95  	c.client.CloseAllowingRebalance()
    96  }
    97  

View as plain text