...

Source file src/edge-infra.dev/pkg/edge/datasync/kafkaclient/admin.go

Documentation: edge-infra.dev/pkg/edge/datasync/kafkaclient

     1  package kafkaclient
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/twmb/franz-go/pkg/kadm"
     8  	"github.com/twmb/franz-go/pkg/kgo"
     9  )
    10  
    11  type Admin struct {
    12  	client *kadm.Client
    13  }
    14  
    15  func NewAdmin(cfg *Config) (*Admin, error) {
    16  	if err := cfg.ValidForAdmin(); err != nil {
    17  		return nil, err
    18  	}
    19  	client, err := kgo.NewClient(kgo.SeedBrokers(cfg.Brokers...))
    20  	if err != nil {
    21  		return nil, err
    22  	}
    23  	admin := kadm.NewClient(client)
    24  	return &Admin{client: admin}, nil
    25  }
    26  
    27  func (a *Admin) EnsureTopic(ctx context.Context, topic string) error {
    28  	if topicExists, err := a.TopicExists(ctx, topic); err == nil && topicExists {
    29  		return nil
    30  	} else if err != nil {
    31  		return err
    32  	}
    33  	return a.CreateTopic(ctx, topic)
    34  }
    35  
    36  func (a *Admin) TopicExists(ctx context.Context, topic string) (bool, error) {
    37  	topicsMetadata, err := a.client.ListTopics(ctx)
    38  	if err != nil {
    39  		return false, err
    40  	}
    41  	return topicsMetadata.Has(topic), nil
    42  }
    43  
    44  func (a *Admin) CreateTopic(ctx context.Context, topic string) error {
    45  	resp, err := a.client.CreateTopics(ctx, 1, 1, nil, topic)
    46  	if err != nil {
    47  		return err
    48  	}
    49  	for _, ctr := range resp {
    50  		if ctr.Err != nil {
    51  			return fmt.Errorf("unable to create topic %q: %w", ctr.Topic, ctr.Err)
    52  		}
    53  	}
    54  	return nil
    55  }
    56  
    57  func (a *Admin) Close() {
    58  	a.client.Close()
    59  }
    60  
    61  type pinger struct {
    62  	client *kgo.Client
    63  }
    64  
    65  func NewPinger(brokers []string) (Pinger, error) {
    66  	client, err := kgo.NewClient(kgo.SeedBrokers(brokers...))
    67  	if err != nil {
    68  		return nil, err
    69  	}
    70  	return &pinger{client: client}, nil
    71  }
    72  
    73  func (p pinger) Ping(ctx context.Context) error {
    74  	return p.client.Ping(ctx)
    75  }
    76  

View as plain text