...

Source file src/github.com/twmb/franz-go/pkg/kadm/kadm.go

Documentation: github.com/twmb/franz-go/pkg/kadm

     1  // Package kadm provides a helper Kafka admin client around a *kgo.Client.
     2  //
     3  // This package is meant to cover the common use cases for dropping into an
     4  // "admin" like interface for Kafka. As with any admin client, this package
     5  // must make opinionated decisions on what to provide and what to hide. The
     6  // underlying Kafka protocol gives more detailed information in responses, or
     7  // allows more fine tuning in requests, but most of the time, these details are
     8  // unnecessary.
     9  //
    10  // By virtue of making opinionated decisions, this package cannot satisfy every
    11  // need for requests and responses. If you need more control than this admin
    12  // client provides, you can use the kmsg package directly.
    13  //
    14  // This package contains a lot of types, but the main two types type to know
    15  // are Client and ShardErrors. Every other type is used for inputs or outputs
    16  // to methods on the client.
    17  //
    18  // The Client type is a simple small wrapper around a *kgo.Client that exists
    19  // solely to namespace methods. The ShardErrors type is a bit more complicated.
    20  // When issuing requests, under the hood some of these requests actually need
    21  // to be mapped to brokers and split, issuing different pieces of the input
    22  // request to different brokers. The *kgo.Client handles this all internally,
    23  // but (if using RequestSharded as directed), returns each response to each of
    24  // these split requests individually. Each response can fail or be successful.
    25  // This package goes one step further and merges these failures into one meta
    26  // failure, ShardErrors. Any function that returns ShardErrors is documented as
    27  // such, and if a function returns a non-nil ShardErrors, it is possible that
    28  // the returned data is actually valid and usable. If you care to, you can log
    29  // / react to the partial failures and continue using the partial successful
    30  // result. This is in contrast to other clients, which either require to to
    31  // request individual brokers directly, or they completely hide individual
    32  // failures, or they completely fail on any individual failure.
    33  //
    34  // For methods that list or describe things, this package often completely
    35  // fails responses on auth failures. If you use a method that accepts two
    36  // topics, one that you are authorized to and one that you are not, you will
    37  // not receive a partial successful response. Instead, you will receive an
    38  // AuthError. Methods that do *not* fail on auth errors are explicitly
    39  // documented as such.
    40  //
    41  // Users may often find it easy to work with lists of topics or partitions.
    42  // Rather than needing to build deeply nested maps directly, this package has a
    43  // few helper types that are worth knowing:
    44  //
    45  //	TopicsList  - a slice of topics and their partitions
    46  //	TopicsSet   - a set of topics, each containing a set of partitions
    47  //	Partitions  - a slice of partitions
    48  //	OffsetsList - a slice of offsets
    49  //	Offsets     - a map of offsets
    50  //
    51  // These types are meant to be easy to build and use, and can be used as the
    52  // starting point for other types.
    53  //
    54  // Many functions in this package are variadic and return either a map or a
    55  // list of responses, and you may only use one element as input and are only
    56  // interested in one element of output. This package provides the following
    57  // functions to help:
    58  //
    59  //	Any(map)
    60  //	AnyE(map, err)
    61  //	First(slice)
    62  //	FirstE(slice, err)
    63  //
    64  // The intended use case of these is something like `kadm.AnyE(kadm.CreateTopics(..., "my-one-topic"))`,
    65  // such that you can immediately get the response for the one topic you are
    66  // creating.
    67  package kadm
    68  
    69  import (
    70  	"errors"
    71  	"regexp"
    72  	"runtime/debug"
    73  	"sort"
    74  	"sync"
    75  
    76  	"github.com/twmb/franz-go/pkg/kgo"
    77  )
    78  
    79  func unptrStr(s *string) string {
    80  	if s == nil {
    81  		return ""
    82  	}
    83  	return *s
    84  }
    85  
    86  var (
    87  	reVersion     *regexp.Regexp
    88  	reVersionOnce sync.Once
    89  )
    90  
    91  // Copied from kgo, but we use the kadm package version.
    92  func softwareVersion() string {
    93  	info, ok := debug.ReadBuildInfo()
    94  	if ok {
    95  		reVersionOnce.Do(func() { reVersion = regexp.MustCompile(`^[a-zA-Z0-9](?:[a-zA-Z0-9.-]*[a-zA-Z0-9])?$`) })
    96  		for _, dep := range info.Deps {
    97  			if dep.Path == "github.com/twmb/franz-go/pkg/kadm" {
    98  				if reVersion.MatchString(dep.Version) {
    99  					return dep.Version
   100  				}
   101  			}
   102  		}
   103  	}
   104  	return "unknown"
   105  }
   106  
   107  // Client is an admin client.
   108  //
   109  // This is a simple wrapper around a *kgo.Client to provide helper admin methods.
   110  type Client struct {
   111  	cl *kgo.Client
   112  
   113  	timeoutMillis int32
   114  }
   115  
   116  // NewClient returns an admin client.
   117  func NewClient(cl *kgo.Client) *Client {
   118  	return &Client{cl, 15000} // 15s timeout default, matching kmsg
   119  }
   120  
   121  // NewOptClient returns a new client directly from kgo options. This is a
   122  // wrapper around creating a new *kgo.Client and then creating an admin client.
   123  func NewOptClient(opts ...kgo.Opt) (*Client, error) {
   124  	cl, err := kgo.NewClient(opts...)
   125  	if err != nil {
   126  		return nil, err
   127  	}
   128  	return NewClient(cl), nil
   129  }
   130  
   131  // Close closes the underlying *kgo.Client.
   132  func (cl *Client) Close() {
   133  	cl.cl.Close()
   134  }
   135  
   136  // SetTimeoutMillis sets the timeout to use for requests that have a timeout,
   137  // overriding the default of 15,000 (15s).
   138  //
   139  // Not all requests have timeouts. Most requests are expected to return
   140  // immediately or are expected to deliberately hang. The following requests
   141  // have timeout fields:
   142  //
   143  //	Produce
   144  //	CreateTopics
   145  //	DeleteTopics
   146  //	DeleteRecords
   147  //	CreatePartitions
   148  //	ElectLeaders
   149  //	AlterPartitionAssignments
   150  //	ListPartitionReassignments
   151  //	UpdateFeatures
   152  //
   153  // Not all requests above are supported in the admin API.
   154  func (cl *Client) SetTimeoutMillis(millis int32) {
   155  	cl.timeoutMillis = millis
   156  }
   157  
   158  // StringPtr is a shortcut function to aid building configs for creating or
   159  // altering topics.
   160  func StringPtr(s string) *string {
   161  	return &s
   162  }
   163  
   164  // BrokerDetail is a type alias for kgo.BrokerMetadata.
   165  type BrokerDetail = kgo.BrokerMetadata
   166  
   167  // BrokerDetails contains the details for many brokers.
   168  type BrokerDetails []BrokerDetail
   169  
   170  // NodeIDs returns the IDs of all nodes.
   171  func (ds BrokerDetails) NodeIDs() []int32 {
   172  	var all []int32
   173  	for _, d := range ds {
   174  		all = append(all, d.NodeID)
   175  	}
   176  	return int32s(all)
   177  }
   178  
   179  // Partition is a partition for a topic.
   180  type Partition struct {
   181  	Topic     string // Topic is the topic for this partition.
   182  	Partition int32  // Partition is this partition's number.
   183  }
   184  
   185  // Offset is an offset for a topic.
   186  type Offset struct {
   187  	Topic       string
   188  	Partition   int32
   189  	At          int64  // Offset is the partition to set.
   190  	LeaderEpoch int32  // LeaderEpoch is the broker leader epoch of the record at this offset.
   191  	Metadata    string // Metadata, if non-empty, is used for offset commits.
   192  }
   193  
   194  // Partitions wraps many partitions.
   195  type Partitions []Partition
   196  
   197  // TopicsSet returns these partitions as TopicsSet.
   198  func (ps Partitions) TopicsSet() TopicsSet {
   199  	s := make(TopicsSet)
   200  	for _, p := range ps {
   201  		s.Add(p.Topic, p.Partition)
   202  	}
   203  	return s
   204  }
   205  
   206  // TopicsList returns these partitions as sorted TopicsList.
   207  func (ps Partitions) TopicsList() TopicsList {
   208  	return ps.TopicsSet().Sorted()
   209  }
   210  
   211  // OffsetsList wraps many offsets and is a helper for building Offsets.
   212  type OffsetsList []Offset
   213  
   214  // Offsets returns this list as the non-list Offsets. All fields in each
   215  // Offset must be set properly.
   216  func (l OffsetsList) Offsets() Offsets {
   217  	os := make(Offsets)
   218  	for _, o := range l {
   219  		os.Add(o)
   220  	}
   221  	return os
   222  }
   223  
   224  // KOffsets returns this list as a kgo offset map.
   225  func (l OffsetsList) KOffsets() map[string]map[int32]kgo.Offset {
   226  	return l.Offsets().KOffsets()
   227  }
   228  
   229  // Offsets wraps many offsets and is the type used for offset functions.
   230  type Offsets map[string]map[int32]Offset
   231  
   232  // Lookup returns the offset at t and p and whether it exists.
   233  func (os Offsets) Lookup(t string, p int32) (Offset, bool) {
   234  	if len(os) == 0 {
   235  		return Offset{}, false
   236  	}
   237  	ps := os[t]
   238  	if len(ps) == 0 {
   239  		return Offset{}, false
   240  	}
   241  	o, exists := ps[p]
   242  	return o, exists
   243  }
   244  
   245  // Add adds an offset for a given topic/partition to this Offsets map.
   246  //
   247  // If the partition already exists, the offset is only added if:
   248  //
   249  //   - the new leader epoch is higher than the old, or
   250  //   - the leader epochs equal, and the new offset is higher than the old
   251  //
   252  // If you would like to add offsets forcefully no matter what, use the Delete
   253  // method before this.
   254  func (os *Offsets) Add(o Offset) {
   255  	if *os == nil {
   256  		*os = make(map[string]map[int32]Offset)
   257  	}
   258  	ot := (*os)[o.Topic]
   259  	if ot == nil {
   260  		ot = make(map[int32]Offset)
   261  		(*os)[o.Topic] = ot
   262  	}
   263  
   264  	prior, exists := ot[o.Partition]
   265  	if !exists || prior.LeaderEpoch < o.LeaderEpoch ||
   266  		prior.LeaderEpoch == o.LeaderEpoch && prior.At < o.At {
   267  		ot[o.Partition] = o
   268  	}
   269  }
   270  
   271  // Delete removes any offset at topic t and partition p.
   272  func (os Offsets) Delete(t string, p int32) {
   273  	if os == nil {
   274  		return
   275  	}
   276  	ot := os[t]
   277  	if ot == nil {
   278  		return
   279  	}
   280  	delete(ot, p)
   281  	if len(ot) == 0 {
   282  		delete(os, t)
   283  	}
   284  }
   285  
   286  // AddOffset is a helper to add an offset for a given topic and partition. The
   287  // leader epoch field must be -1 if you do not know the leader epoch or if
   288  // you do not have an offset yet.
   289  func (os *Offsets) AddOffset(t string, p int32, o int64, leaderEpoch int32) {
   290  	os.Add(Offset{
   291  		Topic:       t,
   292  		Partition:   p,
   293  		At:          o,
   294  		LeaderEpoch: leaderEpoch,
   295  	})
   296  }
   297  
   298  // KeepFunc calls fn for every offset, keeping the offset if fn returns true.
   299  func (os Offsets) KeepFunc(fn func(o Offset) bool) {
   300  	for t, ps := range os {
   301  		for p, o := range ps {
   302  			if !fn(o) {
   303  				delete(ps, p)
   304  			}
   305  		}
   306  		if len(ps) == 0 {
   307  			delete(os, t)
   308  		}
   309  	}
   310  }
   311  
   312  // DeleteFunc calls fn for every offset, deleting the offset if fn returns
   313  // true.
   314  func (os Offsets) DeleteFunc(fn func(o Offset) bool) {
   315  	os.KeepFunc(func(o Offset) bool { return !fn(o) })
   316  }
   317  
   318  // Topics returns the set of topics and partitions currently used in these
   319  // offsets.
   320  func (os Offsets) TopicsSet() TopicsSet {
   321  	s := make(TopicsSet)
   322  	os.Each(func(o Offset) { s.Add(o.Topic, o.Partition) })
   323  	return s
   324  }
   325  
   326  // Each calls fn for each offset in these offsets.
   327  func (os Offsets) Each(fn func(Offset)) {
   328  	for _, ps := range os {
   329  		for _, o := range ps {
   330  			fn(o)
   331  		}
   332  	}
   333  }
   334  
   335  // KOffsets returns these offsets as a kgo offset map.
   336  func (os Offsets) KOffsets() map[string]map[int32]kgo.Offset {
   337  	tskgo := make(map[string]map[int32]kgo.Offset)
   338  	for t, ps := range os {
   339  		pskgo := make(map[int32]kgo.Offset)
   340  		for p, o := range ps {
   341  			pskgo[p] = kgo.NewOffset().
   342  				At(o.At).
   343  				WithEpoch(o.LeaderEpoch)
   344  		}
   345  		tskgo[t] = pskgo
   346  	}
   347  	return tskgo
   348  }
   349  
   350  // Sorted returns the offsets sorted by topic and partition.
   351  func (os Offsets) Sorted() []Offset {
   352  	var s []Offset
   353  	os.Each(func(o Offset) { s = append(s, o) })
   354  	sort.Slice(s, func(i, j int) bool {
   355  		return s[i].Topic < s[j].Topic ||
   356  			s[i].Topic == s[j].Topic && s[i].Partition < s[j].Partition
   357  	})
   358  	return s
   359  }
   360  
   361  // OffsetsFromFetches returns Offsets for the final record in any partition in
   362  // the fetches. This is a helper to enable committing an entire returned batch.
   363  //
   364  // This function looks at only the last record per partition, assuming that the
   365  // last record is the highest offset (which is the behavior returned by kgo's
   366  // Poll functions). The returned offsets are one past the offset contained in
   367  // the records.
   368  func OffsetsFromFetches(fs kgo.Fetches) Offsets {
   369  	os := make(Offsets)
   370  	fs.EachPartition(func(p kgo.FetchTopicPartition) {
   371  		if len(p.Records) == 0 {
   372  			return
   373  		}
   374  		r := p.Records[len(p.Records)-1]
   375  		os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch)
   376  	})
   377  	return os
   378  }
   379  
   380  // OffsetsFromRecords returns offsets for all given records, using the highest
   381  // offset per partition. The returned offsets are one past the offset contained
   382  // in the records.
   383  func OffsetsFromRecords(rs ...kgo.Record) Offsets {
   384  	os := make(Offsets)
   385  	for _, r := range rs {
   386  		os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch)
   387  	}
   388  	return os
   389  }
   390  
   391  // TopicsSet is a set of topics and, per topic, a set of partitions.
   392  //
   393  // All methods provided for TopicsSet are safe to use on a nil (default) set.
   394  type TopicsSet map[string]map[int32]struct{}
   395  
   396  // Lookup returns whether the topic and partition exists.
   397  func (s TopicsSet) Lookup(t string, p int32) bool {
   398  	if len(s) == 0 {
   399  		return false
   400  	}
   401  	ps := s[t]
   402  	if len(ps) == 0 {
   403  		return false
   404  	}
   405  	_, exists := ps[p]
   406  	return exists
   407  }
   408  
   409  // Each calls fn for each topic / partition in the topics set.
   410  func (s TopicsSet) Each(fn func(t string, p int32)) {
   411  	for t, ps := range s {
   412  		for p := range ps {
   413  			fn(t, p)
   414  		}
   415  	}
   416  }
   417  
   418  // EachPartitions calls fn for each topic and its partitions in the topics set.
   419  func (s TopicsSet) EachPartitions(fn func(t string, ps []int32)) {
   420  	for t, ps := range s {
   421  		sliced := make([]int32, 0, len(ps))
   422  		for p := range ps {
   423  			sliced = append(sliced, p)
   424  		}
   425  		fn(t, sliced)
   426  	}
   427  }
   428  
   429  // EmptyTopics returns all topics with no partitions.
   430  func (s TopicsSet) EmptyTopics() []string {
   431  	var e []string
   432  	for t, ps := range s {
   433  		if len(ps) == 0 {
   434  			e = append(e, t)
   435  		}
   436  	}
   437  	return e
   438  }
   439  
   440  // Add adds partitions for a topic to the topics set. If no partitions are
   441  // added, this still creates the topic.
   442  func (s *TopicsSet) Add(t string, ps ...int32) {
   443  	if *s == nil {
   444  		*s = make(map[string]map[int32]struct{})
   445  	}
   446  	existing := (*s)[t]
   447  	if existing == nil {
   448  		existing = make(map[int32]struct{}, len(ps))
   449  		(*s)[t] = existing
   450  	}
   451  	for _, p := range ps {
   452  		existing[p] = struct{}{}
   453  	}
   454  }
   455  
   456  // Delete removes partitions from a topic from the topics set. If the topic
   457  // ends up with no partitions, the topic is removed from the set.
   458  func (s TopicsSet) Delete(t string, ps ...int32) {
   459  	if s == nil || len(ps) == 0 {
   460  		return
   461  	}
   462  	existing := s[t]
   463  	if existing == nil {
   464  		return
   465  	}
   466  	for _, p := range ps {
   467  		delete(existing, p)
   468  	}
   469  	if len(existing) == 0 {
   470  		delete(s, t)
   471  	}
   472  }
   473  
   474  // Topics returns all topics in this set in sorted order.
   475  func (s TopicsSet) Topics() []string {
   476  	ts := make([]string, 0, len(s))
   477  	for t := range s {
   478  		ts = append(ts, t)
   479  	}
   480  	sort.Strings(ts)
   481  	return ts
   482  }
   483  
   484  // Merge merges another topic set into this one.
   485  func (s TopicsSet) Merge(other TopicsSet) {
   486  	for t, ps := range other {
   487  		for p := range ps {
   488  			s.Add(t, p)
   489  		}
   490  	}
   491  }
   492  
   493  // IntoList returns this set as a list.
   494  func (s TopicsSet) IntoList() TopicsList {
   495  	l := make(TopicsList, 0, len(s))
   496  	for t, ps := range s {
   497  		lps := make([]int32, 0, len(ps))
   498  		for p := range ps {
   499  			lps = append(lps, p)
   500  		}
   501  		l = append(l, TopicPartitions{
   502  			Topic:      t,
   503  			Partitions: lps,
   504  		})
   505  	}
   506  	return l
   507  }
   508  
   509  // Sorted returns this set as a list in topic-sorted order, with each topic
   510  // having sorted partitions.
   511  func (s TopicsSet) Sorted() TopicsList {
   512  	l := make(TopicsList, 0, len(s))
   513  	for t, ps := range s {
   514  		tps := TopicPartitions{
   515  			Topic:      t,
   516  			Partitions: make([]int32, 0, len(ps)),
   517  		}
   518  		for p := range ps {
   519  			tps.Partitions = append(tps.Partitions, p)
   520  		}
   521  		tps.Partitions = int32s(tps.Partitions)
   522  		l = append(l, tps)
   523  	}
   524  	sort.Slice(l, func(i, j int) bool { return l[i].Topic < l[j].Topic })
   525  	return l
   526  }
   527  
   528  // TopicPartitions is a topic and partitions.
   529  type TopicPartitions struct {
   530  	Topic      string
   531  	Partitions []int32
   532  }
   533  
   534  // TopicsList is a list of topics and partitions.
   535  type TopicsList []TopicPartitions
   536  
   537  // Each calls fn for each topic / partition in the topics list.
   538  func (l TopicsList) Each(fn func(t string, p int32)) {
   539  	for _, t := range l {
   540  		for _, p := range t.Partitions {
   541  			fn(t.Topic, p)
   542  		}
   543  	}
   544  }
   545  
   546  // EachPartitions calls fn for each topic and its partitions in the topics
   547  // list.
   548  func (l TopicsList) EachPartitions(fn func(t string, ps []int32)) {
   549  	for _, t := range l {
   550  		fn(t.Topic, t.Partitions)
   551  	}
   552  }
   553  
   554  // EmptyTopics returns all topics with no partitions.
   555  func (l TopicsList) EmptyTopics() []string {
   556  	var e []string
   557  	for _, t := range l {
   558  		if len(t.Partitions) == 0 {
   559  			e = append(e, t.Topic)
   560  		}
   561  	}
   562  	return e
   563  }
   564  
   565  // Topics returns all topics in this set in sorted order.
   566  func (l TopicsList) Topics() []string {
   567  	ts := make([]string, 0, len(l))
   568  	for _, t := range l {
   569  		ts = append(ts, t.Topic)
   570  	}
   571  	sort.Strings(ts)
   572  	return ts
   573  }
   574  
   575  // IntoSet returns this list as a set.
   576  func (l TopicsList) IntoSet() TopicsSet {
   577  	s := make(TopicsSet)
   578  	for _, t := range l {
   579  		s.Add(t.Topic, t.Partitions...)
   580  	}
   581  	return s
   582  }
   583  
   584  // First returns the first element of the input slice and whether it exists.
   585  // This is the non-error-accepting equivalent of FirstE.
   586  //
   587  // Many client methods in kadm accept a variadic amount of input arguments and
   588  // return either a slice or a map of responses, but you often use the method
   589  // with only one argument. This function can help extract the one response you
   590  // are interested in.
   591  func First[S ~[]T, T any](s S) (T, bool) {
   592  	if len(s) == 0 {
   593  		var t T
   594  		return t, false
   595  	}
   596  	return s[0], true
   597  }
   598  
   599  // Any returns the first range element of the input map and whether it exists.
   600  // This is the non-error-accepting equivalent of AnyE.
   601  //
   602  // Many client methods in kadm accept a variadic amount of input arguments and
   603  // return either a slice or a map of responses, but you often use the method
   604  // with only one argument. This function can help extract the one response you
   605  // are interested in.
   606  func Any[M ~map[K]V, K comparable, V any](m M) (V, bool) {
   607  	for _, v := range m {
   608  		return v, true
   609  	}
   610  	var v V
   611  	return v, false
   612  }
   613  
   614  // ErrEmpty is returned from FirstE or AnyE if the input is empty.
   615  var ErrEmpty = errors.New("empty")
   616  
   617  // FirstE returns the first element of the input slice, or the input error
   618  // if it is non-nil. If the error is nil but the slice is empty, this returns
   619  // ErrEmpty. This is the error-accepting equivalent of First.
   620  //
   621  // Many client methods in kadm accept a variadic amount of input arguments and
   622  // return either a slice or a map of responses, but you often use the method
   623  // with only one argument. This function can help extract the one response you
   624  // are interested in.
   625  func FirstE[S ~[]T, T any](s S, err error) (T, error) {
   626  	if err != nil {
   627  		var t T
   628  		return t, err
   629  	}
   630  	if len(s) == 0 {
   631  		var t T
   632  		return t, ErrEmpty
   633  	}
   634  	return s[0], err
   635  }
   636  
   637  // AnyE returns the first range element of the input map, or the input error if
   638  // it is non-nil. If the error is nil but the map is empty, this returns
   639  // ErrEmpty. This is the error-accepting equivalent of Any.
   640  //
   641  // Many client methods in kadm accept a variadic amount of input arguments and
   642  // return either a slice or a map of responses, but you often use the method
   643  // with only one argument. This function can help extract the one response you
   644  // are interested in.
   645  func AnyE[M ~map[K]V, K comparable, V any](m M, err error) (V, error) {
   646  	if err != nil {
   647  		var v V
   648  		return v, err
   649  	}
   650  	for _, v := range m {
   651  		return v, nil
   652  	}
   653  	var v V
   654  	return v, ErrEmpty
   655  }
   656  

View as plain text