// Package kadm provides a helper Kafka admin client around a *kgo.Client.
//
// This package is meant to cover the common use cases for dropping into an
// "admin" like interface for Kafka. As with any admin client, this package
// must make opinionated decisions on what to provide and what to hide. The
// underlying Kafka protocol gives more detailed information in responses, or
// allows more fine tuning in requests, but most of the time, these details are
// unnecessary.
//
// By virtue of making opinionated decisions, this package cannot satisfy every
// need for requests and responses. If you need more control than this admin
// client provides, you can use the kmsg package directly.
//
// This package contains a lot of types, but the main two types type to know
// are Client and ShardErrors. Every other type is used for inputs or outputs
// to methods on the client.
//
// The Client type is a simple small wrapper around a *kgo.Client that exists
// solely to namespace methods. The ShardErrors type is a bit more complicated.
// When issuing requests, under the hood some of these requests actually need
// to be mapped to brokers and split, issuing different pieces of the input
// request to different brokers. The *kgo.Client handles this all internally,
// but (if using RequestSharded as directed), returns each response to each of
// these split requests individually. Each response can fail or be successful.
// This package goes one step further and merges these failures into one meta
// failure, ShardErrors. Any function that returns ShardErrors is documented as
// such, and if a function returns a non-nil ShardErrors, it is possible that
// the returned data is actually valid and usable. If you care to, you can log
// / react to the partial failures and continue using the partial successful
// result. This is in contrast to other clients, which either require to to
// request individual brokers directly, or they completely hide individual
// failures, or they completely fail on any individual failure.
//
// For methods that list or describe things, this package often completely
// fails responses on auth failures. If you use a method that accepts two
// topics, one that you are authorized to and one that you are not, you will
// not receive a partial successful response. Instead, you will receive an
// AuthError. Methods that do *not* fail on auth errors are explicitly
// documented as such.
//
// Users may often find it easy to work with lists of topics or partitions.
// Rather than needing to build deeply nested maps directly, this package has a
// few helper types that are worth knowing:
//
//	TopicsList  - a slice of topics and their partitions
//	TopicsSet   - a set of topics, each containing a set of partitions
//	Partitions  - a slice of partitions
//	OffsetsList - a slice of offsets
//	Offsets     - a map of offsets
//
// These types are meant to be easy to build and use, and can be used as the
// starting point for other types.
//
// Many functions in this package are variadic and return either a map or a
// list of responses, and you may only use one element as input and are only
// interested in one element of output. This package provides the following
// functions to help:
//
//	Any(map)
//	AnyE(map, err)
//	First(slice)
//	FirstE(slice, err)
//
// The intended use case of these is something like `kadm.AnyE(kadm.CreateTopics(..., "my-one-topic"))`,
// such that you can immediately get the response for the one topic you are
// creating.
package kadm

import (
	"errors"
	"regexp"
	"runtime/debug"
	"sort"
	"sync"

	"github.com/twmb/franz-go/pkg/kgo"
)

func unptrStr(s *string) string {
	if s == nil {
		return ""
	}
	return *s
}

var (
	reVersion     *regexp.Regexp
	reVersionOnce sync.Once
)

// Copied from kgo, but we use the kadm package version.
func softwareVersion() string {
	info, ok := debug.ReadBuildInfo()
	if ok {
		reVersionOnce.Do(func() { reVersion = regexp.MustCompile(`^[a-zA-Z0-9](?:[a-zA-Z0-9.-]*[a-zA-Z0-9])?$`) })
		for _, dep := range info.Deps {
			if dep.Path == "github.com/twmb/franz-go/pkg/kadm" {
				if reVersion.MatchString(dep.Version) {
					return dep.Version
				}
			}
		}
	}
	return "unknown"
}

// Client is an admin client.
//
// This is a simple wrapper around a *kgo.Client to provide helper admin methods.
type Client struct {
	cl *kgo.Client

	timeoutMillis int32
}

// NewClient returns an admin client.
func NewClient(cl *kgo.Client) *Client {
	return &Client{cl, 15000} // 15s timeout default, matching kmsg
}

// NewOptClient returns a new client directly from kgo options. This is a
// wrapper around creating a new *kgo.Client and then creating an admin client.
func NewOptClient(opts ...kgo.Opt) (*Client, error) {
	cl, err := kgo.NewClient(opts...)
	if err != nil {
		return nil, err
	}
	return NewClient(cl), nil
}

// Close closes the underlying *kgo.Client.
func (cl *Client) Close() {
	cl.cl.Close()
}

// SetTimeoutMillis sets the timeout to use for requests that have a timeout,
// overriding the default of 15,000 (15s).
//
// Not all requests have timeouts. Most requests are expected to return
// immediately or are expected to deliberately hang. The following requests
// have timeout fields:
//
//	Produce
//	CreateTopics
//	DeleteTopics
//	DeleteRecords
//	CreatePartitions
//	ElectLeaders
//	AlterPartitionAssignments
//	ListPartitionReassignments
//	UpdateFeatures
//
// Not all requests above are supported in the admin API.
func (cl *Client) SetTimeoutMillis(millis int32) {
	cl.timeoutMillis = millis
}

// StringPtr is a shortcut function to aid building configs for creating or
// altering topics.
func StringPtr(s string) *string {
	return &s
}

// BrokerDetail is a type alias for kgo.BrokerMetadata.
type BrokerDetail = kgo.BrokerMetadata

// BrokerDetails contains the details for many brokers.
type BrokerDetails []BrokerDetail

// NodeIDs returns the IDs of all nodes.
func (ds BrokerDetails) NodeIDs() []int32 {
	var all []int32
	for _, d := range ds {
		all = append(all, d.NodeID)
	}
	return int32s(all)
}

// Partition is a partition for a topic.
type Partition struct {
	Topic     string // Topic is the topic for this partition.
	Partition int32  // Partition is this partition's number.
}

// Offset is an offset for a topic.
type Offset struct {
	Topic       string
	Partition   int32
	At          int64  // Offset is the partition to set.
	LeaderEpoch int32  // LeaderEpoch is the broker leader epoch of the record at this offset.
	Metadata    string // Metadata, if non-empty, is used for offset commits.
}

// Partitions wraps many partitions.
type Partitions []Partition

// TopicsSet returns these partitions as TopicsSet.
func (ps Partitions) TopicsSet() TopicsSet {
	s := make(TopicsSet)
	for _, p := range ps {
		s.Add(p.Topic, p.Partition)
	}
	return s
}

// TopicsList returns these partitions as sorted TopicsList.
func (ps Partitions) TopicsList() TopicsList {
	return ps.TopicsSet().Sorted()
}

// OffsetsList wraps many offsets and is a helper for building Offsets.
type OffsetsList []Offset

// Offsets returns this list as the non-list Offsets. All fields in each
// Offset must be set properly.
func (l OffsetsList) Offsets() Offsets {
	os := make(Offsets)
	for _, o := range l {
		os.Add(o)
	}
	return os
}

// KOffsets returns this list as a kgo offset map.
func (l OffsetsList) KOffsets() map[string]map[int32]kgo.Offset {
	return l.Offsets().KOffsets()
}

// Offsets wraps many offsets and is the type used for offset functions.
type Offsets map[string]map[int32]Offset

// Lookup returns the offset at t and p and whether it exists.
func (os Offsets) Lookup(t string, p int32) (Offset, bool) {
	if len(os) == 0 {
		return Offset{}, false
	}
	ps := os[t]
	if len(ps) == 0 {
		return Offset{}, false
	}
	o, exists := ps[p]
	return o, exists
}

// Add adds an offset for a given topic/partition to this Offsets map.
//
// If the partition already exists, the offset is only added if:
//
//   - the new leader epoch is higher than the old, or
//   - the leader epochs equal, and the new offset is higher than the old
//
// If you would like to add offsets forcefully no matter what, use the Delete
// method before this.
func (os *Offsets) Add(o Offset) {
	if *os == nil {
		*os = make(map[string]map[int32]Offset)
	}
	ot := (*os)[o.Topic]
	if ot == nil {
		ot = make(map[int32]Offset)
		(*os)[o.Topic] = ot
	}

	prior, exists := ot[o.Partition]
	if !exists || prior.LeaderEpoch < o.LeaderEpoch ||
		prior.LeaderEpoch == o.LeaderEpoch && prior.At < o.At {
		ot[o.Partition] = o
	}
}

// Delete removes any offset at topic t and partition p.
func (os Offsets) Delete(t string, p int32) {
	if os == nil {
		return
	}
	ot := os[t]
	if ot == nil {
		return
	}
	delete(ot, p)
	if len(ot) == 0 {
		delete(os, t)
	}
}

// AddOffset is a helper to add an offset for a given topic and partition. The
// leader epoch field must be -1 if you do not know the leader epoch or if
// you do not have an offset yet.
func (os *Offsets) AddOffset(t string, p int32, o int64, leaderEpoch int32) {
	os.Add(Offset{
		Topic:       t,
		Partition:   p,
		At:          o,
		LeaderEpoch: leaderEpoch,
	})
}

// KeepFunc calls fn for every offset, keeping the offset if fn returns true.
func (os Offsets) KeepFunc(fn func(o Offset) bool) {
	for t, ps := range os {
		for p, o := range ps {
			if !fn(o) {
				delete(ps, p)
			}
		}
		if len(ps) == 0 {
			delete(os, t)
		}
	}
}

// DeleteFunc calls fn for every offset, deleting the offset if fn returns
// true.
func (os Offsets) DeleteFunc(fn func(o Offset) bool) {
	os.KeepFunc(func(o Offset) bool { return !fn(o) })
}

// Topics returns the set of topics and partitions currently used in these
// offsets.
func (os Offsets) TopicsSet() TopicsSet {
	s := make(TopicsSet)
	os.Each(func(o Offset) { s.Add(o.Topic, o.Partition) })
	return s
}

// Each calls fn for each offset in these offsets.
func (os Offsets) Each(fn func(Offset)) {
	for _, ps := range os {
		for _, o := range ps {
			fn(o)
		}
	}
}

// KOffsets returns these offsets as a kgo offset map.
func (os Offsets) KOffsets() map[string]map[int32]kgo.Offset {
	tskgo := make(map[string]map[int32]kgo.Offset)
	for t, ps := range os {
		pskgo := make(map[int32]kgo.Offset)
		for p, o := range ps {
			pskgo[p] = kgo.NewOffset().
				At(o.At).
				WithEpoch(o.LeaderEpoch)
		}
		tskgo[t] = pskgo
	}
	return tskgo
}

// Sorted returns the offsets sorted by topic and partition.
func (os Offsets) Sorted() []Offset {
	var s []Offset
	os.Each(func(o Offset) { s = append(s, o) })
	sort.Slice(s, func(i, j int) bool {
		return s[i].Topic < s[j].Topic ||
			s[i].Topic == s[j].Topic && s[i].Partition < s[j].Partition
	})
	return s
}

// OffsetsFromFetches returns Offsets for the final record in any partition in
// the fetches. This is a helper to enable committing an entire returned batch.
//
// This function looks at only the last record per partition, assuming that the
// last record is the highest offset (which is the behavior returned by kgo's
// Poll functions). The returned offsets are one past the offset contained in
// the records.
func OffsetsFromFetches(fs kgo.Fetches) Offsets {
	os := make(Offsets)
	fs.EachPartition(func(p kgo.FetchTopicPartition) {
		if len(p.Records) == 0 {
			return
		}
		r := p.Records[len(p.Records)-1]
		os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch)
	})
	return os
}

// OffsetsFromRecords returns offsets for all given records, using the highest
// offset per partition. The returned offsets are one past the offset contained
// in the records.
func OffsetsFromRecords(rs ...kgo.Record) Offsets {
	os := make(Offsets)
	for _, r := range rs {
		os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch)
	}
	return os
}

// TopicsSet is a set of topics and, per topic, a set of partitions.
//
// All methods provided for TopicsSet are safe to use on a nil (default) set.
type TopicsSet map[string]map[int32]struct{}

// Lookup returns whether the topic and partition exists.
func (s TopicsSet) Lookup(t string, p int32) bool {
	if len(s) == 0 {
		return false
	}
	ps := s[t]
	if len(ps) == 0 {
		return false
	}
	_, exists := ps[p]
	return exists
}

// Each calls fn for each topic / partition in the topics set.
func (s TopicsSet) Each(fn func(t string, p int32)) {
	for t, ps := range s {
		for p := range ps {
			fn(t, p)
		}
	}
}

// EachPartitions calls fn for each topic and its partitions in the topics set.
func (s TopicsSet) EachPartitions(fn func(t string, ps []int32)) {
	for t, ps := range s {
		sliced := make([]int32, 0, len(ps))
		for p := range ps {
			sliced = append(sliced, p)
		}
		fn(t, sliced)
	}
}

// EmptyTopics returns all topics with no partitions.
func (s TopicsSet) EmptyTopics() []string {
	var e []string
	for t, ps := range s {
		if len(ps) == 0 {
			e = append(e, t)
		}
	}
	return e
}

// Add adds partitions for a topic to the topics set. If no partitions are
// added, this still creates the topic.
func (s *TopicsSet) Add(t string, ps ...int32) {
	if *s == nil {
		*s = make(map[string]map[int32]struct{})
	}
	existing := (*s)[t]
	if existing == nil {
		existing = make(map[int32]struct{}, len(ps))
		(*s)[t] = existing
	}
	for _, p := range ps {
		existing[p] = struct{}{}
	}
}

// Delete removes partitions from a topic from the topics set. If the topic
// ends up with no partitions, the topic is removed from the set.
func (s TopicsSet) Delete(t string, ps ...int32) {
	if s == nil || len(ps) == 0 {
		return
	}
	existing := s[t]
	if existing == nil {
		return
	}
	for _, p := range ps {
		delete(existing, p)
	}
	if len(existing) == 0 {
		delete(s, t)
	}
}

// Topics returns all topics in this set in sorted order.
func (s TopicsSet) Topics() []string {
	ts := make([]string, 0, len(s))
	for t := range s {
		ts = append(ts, t)
	}
	sort.Strings(ts)
	return ts
}

// Merge merges another topic set into this one.
func (s TopicsSet) Merge(other TopicsSet) {
	for t, ps := range other {
		for p := range ps {
			s.Add(t, p)
		}
	}
}

// IntoList returns this set as a list.
func (s TopicsSet) IntoList() TopicsList {
	l := make(TopicsList, 0, len(s))
	for t, ps := range s {
		lps := make([]int32, 0, len(ps))
		for p := range ps {
			lps = append(lps, p)
		}
		l = append(l, TopicPartitions{
			Topic:      t,
			Partitions: lps,
		})
	}
	return l
}

// Sorted returns this set as a list in topic-sorted order, with each topic
// having sorted partitions.
func (s TopicsSet) Sorted() TopicsList {
	l := make(TopicsList, 0, len(s))
	for t, ps := range s {
		tps := TopicPartitions{
			Topic:      t,
			Partitions: make([]int32, 0, len(ps)),
		}
		for p := range ps {
			tps.Partitions = append(tps.Partitions, p)
		}
		tps.Partitions = int32s(tps.Partitions)
		l = append(l, tps)
	}
	sort.Slice(l, func(i, j int) bool { return l[i].Topic < l[j].Topic })
	return l
}

// TopicPartitions is a topic and partitions.
type TopicPartitions struct {
	Topic      string
	Partitions []int32
}

// TopicsList is a list of topics and partitions.
type TopicsList []TopicPartitions

// Each calls fn for each topic / partition in the topics list.
func (l TopicsList) Each(fn func(t string, p int32)) {
	for _, t := range l {
		for _, p := range t.Partitions {
			fn(t.Topic, p)
		}
	}
}

// EachPartitions calls fn for each topic and its partitions in the topics
// list.
func (l TopicsList) EachPartitions(fn func(t string, ps []int32)) {
	for _, t := range l {
		fn(t.Topic, t.Partitions)
	}
}

// EmptyTopics returns all topics with no partitions.
func (l TopicsList) EmptyTopics() []string {
	var e []string
	for _, t := range l {
		if len(t.Partitions) == 0 {
			e = append(e, t.Topic)
		}
	}
	return e
}

// Topics returns all topics in this set in sorted order.
func (l TopicsList) Topics() []string {
	ts := make([]string, 0, len(l))
	for _, t := range l {
		ts = append(ts, t.Topic)
	}
	sort.Strings(ts)
	return ts
}

// IntoSet returns this list as a set.
func (l TopicsList) IntoSet() TopicsSet {
	s := make(TopicsSet)
	for _, t := range l {
		s.Add(t.Topic, t.Partitions...)
	}
	return s
}

// First returns the first element of the input slice and whether it exists.
// This is the non-error-accepting equivalent of FirstE.
//
// Many client methods in kadm accept a variadic amount of input arguments and
// return either a slice or a map of responses, but you often use the method
// with only one argument. This function can help extract the one response you
// are interested in.
func First[S ~[]T, T any](s S) (T, bool) {
	if len(s) == 0 {
		var t T
		return t, false
	}
	return s[0], true
}

// Any returns the first range element of the input map and whether it exists.
// This is the non-error-accepting equivalent of AnyE.
//
// Many client methods in kadm accept a variadic amount of input arguments and
// return either a slice or a map of responses, but you often use the method
// with only one argument. This function can help extract the one response you
// are interested in.
func Any[M ~map[K]V, K comparable, V any](m M) (V, bool) {
	for _, v := range m {
		return v, true
	}
	var v V
	return v, false
}

// ErrEmpty is returned from FirstE or AnyE if the input is empty.
var ErrEmpty = errors.New("empty")

// FirstE returns the first element of the input slice, or the input error
// if it is non-nil. If the error is nil but the slice is empty, this returns
// ErrEmpty. This is the error-accepting equivalent of First.
//
// Many client methods in kadm accept a variadic amount of input arguments and
// return either a slice or a map of responses, but you often use the method
// with only one argument. This function can help extract the one response you
// are interested in.
func FirstE[S ~[]T, T any](s S, err error) (T, error) {
	if err != nil {
		var t T
		return t, err
	}
	if len(s) == 0 {
		var t T
		return t, ErrEmpty
	}
	return s[0], err
}

// AnyE returns the first range element of the input map, or the input error if
// it is non-nil. If the error is nil but the map is empty, this returns
// ErrEmpty. This is the error-accepting equivalent of Any.
//
// Many client methods in kadm accept a variadic amount of input arguments and
// return either a slice or a map of responses, but you often use the method
// with only one argument. This function can help extract the one response you
// are interested in.
func AnyE[M ~map[K]V, K comparable, V any](m M, err error) (V, error) {
	if err != nil {
		var v V
		return v, err
	}
	for _, v := range m {
		return v, nil
	}
	var v V
	return v, ErrEmpty
}