package kafkaclient import ( "context" "fmt" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" ) type Admin struct { client *kadm.Client } func NewAdmin(cfg *Config) (*Admin, error) { if err := cfg.ValidForAdmin(); err != nil { return nil, err } client, err := kgo.NewClient(kgo.SeedBrokers(cfg.Brokers...)) if err != nil { return nil, err } admin := kadm.NewClient(client) return &Admin{client: admin}, nil } func (a *Admin) EnsureTopic(ctx context.Context, topic string) error { if topicExists, err := a.TopicExists(ctx, topic); err == nil && topicExists { return nil } else if err != nil { return err } return a.CreateTopic(ctx, topic) } func (a *Admin) TopicExists(ctx context.Context, topic string) (bool, error) { topicsMetadata, err := a.client.ListTopics(ctx) if err != nil { return false, err } return topicsMetadata.Has(topic), nil } func (a *Admin) CreateTopic(ctx context.Context, topic string) error { resp, err := a.client.CreateTopics(ctx, 1, 1, nil, topic) if err != nil { return err } for _, ctr := range resp { if ctr.Err != nil { return fmt.Errorf("unable to create topic %q: %w", ctr.Topic, ctr.Err) } } return nil } func (a *Admin) Close() { a.client.Close() } type pinger struct { client *kgo.Client } func NewPinger(brokers []string) (Pinger, error) { client, err := kgo.NewClient(kgo.SeedBrokers(brokers...)) if err != nil { return nil, err } return &pinger{client: client}, nil } func (p pinger) Ping(ctx context.Context) error { return p.client.Ping(ctx) }