...
1 package kafkaclient
2
3 import (
4 "context"
5 "fmt"
6
7 "github.com/twmb/franz-go/pkg/kadm"
8 "github.com/twmb/franz-go/pkg/kgo"
9 )
10
11 type Admin struct {
12 client *kadm.Client
13 }
14
15 func NewAdmin(cfg *Config) (*Admin, error) {
16 if err := cfg.ValidForAdmin(); err != nil {
17 return nil, err
18 }
19 client, err := kgo.NewClient(kgo.SeedBrokers(cfg.Brokers...))
20 if err != nil {
21 return nil, err
22 }
23 admin := kadm.NewClient(client)
24 return &Admin{client: admin}, nil
25 }
26
27 func (a *Admin) EnsureTopic(ctx context.Context, topic string) error {
28 if topicExists, err := a.TopicExists(ctx, topic); err == nil && topicExists {
29 return nil
30 } else if err != nil {
31 return err
32 }
33 return a.CreateTopic(ctx, topic)
34 }
35
36 func (a *Admin) TopicExists(ctx context.Context, topic string) (bool, error) {
37 topicsMetadata, err := a.client.ListTopics(ctx)
38 if err != nil {
39 return false, err
40 }
41 return topicsMetadata.Has(topic), nil
42 }
43
44 func (a *Admin) CreateTopic(ctx context.Context, topic string) error {
45 resp, err := a.client.CreateTopics(ctx, 1, 1, nil, topic)
46 if err != nil {
47 return err
48 }
49 for _, ctr := range resp {
50 if ctr.Err != nil {
51 return fmt.Errorf("unable to create topic %q: %w", ctr.Topic, ctr.Err)
52 }
53 }
54 return nil
55 }
56
57 func (a *Admin) Close() {
58 a.client.Close()
59 }
60
61 type pinger struct {
62 client *kgo.Client
63 }
64
65 func NewPinger(brokers []string) (Pinger, error) {
66 client, err := kgo.NewClient(kgo.SeedBrokers(brokers...))
67 if err != nil {
68 return nil, err
69 }
70 return &pinger{client: client}, nil
71 }
72
73 func (p pinger) Ping(ctx context.Context) error {
74 return p.client.Ping(ctx)
75 }
76
View as plain text