...

Source file src/github.com/twmb/franz-go/pkg/kadm/metadata.go

Documentation: github.com/twmb/franz-go/pkg/kadm

     1  package kadm
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/base64"
     7  	"fmt"
     8  	"sort"
     9  
    10  	"github.com/twmb/franz-go/pkg/kerr"
    11  	"github.com/twmb/franz-go/pkg/kgo"
    12  	"github.com/twmb/franz-go/pkg/kmsg"
    13  )
    14  
    15  // TopicID is the 16 byte underlying topic ID.
    16  type TopicID [16]byte
    17  
    18  // String returns the topic ID encoded as base64.
    19  func (t TopicID) String() string { return base64.StdEncoding.EncodeToString(t[:]) }
    20  
    21  // MarshalJSON returns the topic ID encoded as quoted base64.
    22  func (t TopicID) MarshalJSON() ([]byte, error) { return []byte(`"` + t.String() + `"`), nil }
    23  
    24  // Less returns if this ID is less than the other, byte by byte.
    25  func (t TopicID) Less(other TopicID) bool {
    26  	return bytes.Compare(t[:], other[:]) == -1
    27  }
    28  
    29  // PartitionDetail is the detail of a partition as returned by a metadata
    30  // response. If the partition fails to load / has an error, then only the
    31  // partition number itself and the Err fields will be set.
    32  type PartitionDetail struct {
    33  	Topic     string // Topic is the topic this partition belongs to.
    34  	Partition int32  // Partition is the partition number these details are for.
    35  
    36  	Leader          int32   // Leader is the broker leader, if there is one, otherwise -1.
    37  	LeaderEpoch     int32   // LeaderEpoch is the leader's current epoch.
    38  	Replicas        []int32 // Replicas is the list of replicas.
    39  	ISR             []int32 // ISR is the list of in sync replicas.
    40  	OfflineReplicas []int32 // OfflineReplicas is the list of offline replicas.
    41  
    42  	Err error // Err is non-nil if the partition currently has a load error.
    43  }
    44  
    45  // PartitionDetails contains details for partitions as returned by a metadata
    46  // response.
    47  type PartitionDetails map[int32]PartitionDetail
    48  
    49  // Sorted returns the partitions in sorted order.
    50  func (ds PartitionDetails) Sorted() []PartitionDetail {
    51  	s := make([]PartitionDetail, 0, len(ds))
    52  	for _, d := range ds {
    53  		s = append(s, d)
    54  	}
    55  	sort.Slice(s, func(i, j int) bool { return s[i].Partition < s[j].Partition })
    56  	return s
    57  }
    58  
    59  // Numbers returns a sorted list of all partition numbers.
    60  func (ds PartitionDetails) Numbers() []int32 {
    61  	all := make([]int32, 0, len(ds))
    62  	for p := range ds {
    63  		all = append(all, p)
    64  	}
    65  	return int32s(all)
    66  }
    67  
    68  // NumReplicas returns the number of replicas for these partitions
    69  //
    70  // It is assumed that all partitions have the same number of replicas, so this
    71  // simply returns the number of replicas in the first encountered partition.
    72  func (ds PartitionDetails) NumReplicas() int {
    73  	for _, p := range ds {
    74  		return len(p.Replicas)
    75  	}
    76  	return 0
    77  }
    78  
    79  // TopicDetail is the detail of a topic as returned by a metadata response. If
    80  // the topic fails to load / has an error, then there will be no partitions.
    81  type TopicDetail struct {
    82  	Topic string // Topic is the topic these details are for.
    83  
    84  	ID         TopicID          // TopicID is the topic's ID, or all 0 if the broker does not support IDs.
    85  	IsInternal bool             // IsInternal is whether the topic is an internal topic.
    86  	Partitions PartitionDetails // Partitions contains details about the topic's partitions.
    87  
    88  	Err error // Err is non-nil if the topic could not be loaded.
    89  }
    90  
    91  // TopicDetails contains details for topics as returned by a metadata response.
    92  type TopicDetails map[string]TopicDetail
    93  
    94  // Topics returns a sorted list of all topic names.
    95  func (ds TopicDetails) Names() []string {
    96  	all := make([]string, 0, len(ds))
    97  	for t := range ds {
    98  		all = append(all, t)
    99  	}
   100  	sort.Strings(all)
   101  	return all
   102  }
   103  
   104  // Sorted returns all topics in sorted order.
   105  func (ds TopicDetails) Sorted() []TopicDetail {
   106  	s := make([]TopicDetail, 0, len(ds))
   107  	for _, d := range ds {
   108  		s = append(s, d)
   109  	}
   110  	sort.Slice(s, func(i, j int) bool {
   111  		if s[i].Topic == "" {
   112  			if s[j].Topic == "" {
   113  				return bytes.Compare(s[i].ID[:], s[j].ID[:]) == -1
   114  			}
   115  			return true
   116  		}
   117  		if s[j].Topic == "" {
   118  			return false
   119  		}
   120  		return s[i].Topic < s[j].Topic
   121  	})
   122  	return s
   123  }
   124  
   125  // Has returns whether the topic details has the given topic and, if so, that
   126  // the topic's load error is not an unknown topic error.
   127  func (ds TopicDetails) Has(topic string) bool {
   128  	d, ok := ds[topic]
   129  	return ok && d.Err != kerr.UnknownTopicOrPartition
   130  }
   131  
   132  // FilterInternal deletes any internal topics from this set of topic details.
   133  func (ds TopicDetails) FilterInternal() {
   134  	for t, d := range ds {
   135  		if d.IsInternal {
   136  			delete(ds, t)
   137  		}
   138  	}
   139  }
   140  
   141  // EachPartition calls fn for every partition in all topics.
   142  func (ds TopicDetails) EachPartition(fn func(PartitionDetail)) {
   143  	for _, td := range ds {
   144  		for _, d := range td.Partitions {
   145  			fn(d)
   146  		}
   147  	}
   148  }
   149  
   150  // EachError calls fn for each topic that could not be loaded.
   151  func (ds TopicDetails) EachError(fn func(TopicDetail)) {
   152  	for _, td := range ds {
   153  		if td.Err != nil {
   154  			fn(td)
   155  		}
   156  	}
   157  }
   158  
   159  // Error iterates over all topic details and returns the first error
   160  // encountered, if any.
   161  func (ds TopicDetails) Error() error {
   162  	for _, t := range ds {
   163  		if t.Err != nil {
   164  			return t.Err
   165  		}
   166  	}
   167  	return nil
   168  }
   169  
   170  // TopicsSet returns the topics and partitions as a set.
   171  func (ds TopicDetails) TopicsSet() TopicsSet {
   172  	var s TopicsSet
   173  	ds.EachPartition(func(d PartitionDetail) {
   174  		s.Add(d.Topic, d.Partition)
   175  	})
   176  	return s
   177  }
   178  
   179  // TopicsList returns the topics and partitions as a list.
   180  func (ds TopicDetails) TopicsList() TopicsList {
   181  	return ds.TopicsSet().Sorted()
   182  }
   183  
   184  // Metadata is the data from a metadata response.
   185  type Metadata struct {
   186  	Cluster    string        // Cluster is the cluster name, if any.
   187  	Controller int32         // Controller is the node ID of the controller broker, if available, otherwise -1.
   188  	Brokers    BrokerDetails // Brokers contains broker details, sorted by default.
   189  	Topics     TopicDetails  // Topics contains topic details.
   190  }
   191  
   192  func int32s(is []int32) []int32 {
   193  	sort.Slice(is, func(i, j int) bool { return is[i] < is[j] })
   194  	return is
   195  }
   196  
   197  // ListBrokers issues a metadata request and returns BrokerDetails. This
   198  // returns an error if the request fails to be issued, or an *AuthError.
   199  func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error) {
   200  	m, err := cl.Metadata(ctx)
   201  	if err != nil {
   202  		return nil, err
   203  	}
   204  	return m.Brokers, nil
   205  }
   206  
   207  // BrokerMetadata issues a metadata request and returns it, and does not ask
   208  // for any topics.
   209  //
   210  // This returns an error if the request fails to be issued, or an *AuthErr.
   211  func (cl *Client) BrokerMetadata(ctx context.Context) (Metadata, error) {
   212  	return cl.metadata(ctx, true, nil)
   213  }
   214  
   215  // Metadata issues a metadata request and returns it. Specific topics to
   216  // describe can be passed as additional arguments. If no topics are specified,
   217  // all topics are requested.
   218  //
   219  // This returns an error if the request fails to be issued, or an *AuthErr.
   220  func (cl *Client) Metadata(
   221  	ctx context.Context,
   222  	topics ...string,
   223  ) (Metadata, error) {
   224  	return cl.metadata(ctx, false, topics)
   225  }
   226  
   227  func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string) (Metadata, error) {
   228  	req := kmsg.NewPtrMetadataRequest()
   229  	for _, t := range topics {
   230  		rt := kmsg.NewMetadataRequestTopic()
   231  		rt.Topic = kmsg.StringPtr(t)
   232  		req.Topics = append(req.Topics, rt)
   233  	}
   234  	if noTopics {
   235  		req.Topics = []kmsg.MetadataRequestTopic{}
   236  	}
   237  	resp, err := req.RequestWith(ctx, cl.cl)
   238  	if err != nil {
   239  		return Metadata{}, err
   240  	}
   241  
   242  	tds := make(map[string]TopicDetail, len(resp.Topics))
   243  	for _, t := range resp.Topics {
   244  		if err := maybeAuthErr(t.ErrorCode); err != nil {
   245  			return Metadata{}, err
   246  		}
   247  		td := TopicDetail{
   248  			ID:         t.TopicID,
   249  			Partitions: make(map[int32]PartitionDetail),
   250  			IsInternal: t.IsInternal,
   251  			Err:        kerr.ErrorForCode(t.ErrorCode),
   252  		}
   253  		if t.Topic != nil {
   254  			td.Topic = *t.Topic
   255  		}
   256  		for _, p := range t.Partitions {
   257  			td.Partitions[p.Partition] = PartitionDetail{
   258  				Topic:     td.Topic,
   259  				Partition: p.Partition,
   260  
   261  				Leader:          p.Leader,
   262  				LeaderEpoch:     p.LeaderEpoch,
   263  				Replicas:        p.Replicas,
   264  				ISR:             p.ISR,
   265  				OfflineReplicas: p.OfflineReplicas,
   266  
   267  				Err: kerr.ErrorForCode(p.ErrorCode),
   268  			}
   269  		}
   270  		tds[*t.Topic] = td
   271  	}
   272  
   273  	m := Metadata{
   274  		Controller: resp.ControllerID,
   275  		Topics:     tds,
   276  	}
   277  	if resp.ClusterID != nil {
   278  		m.Cluster = *resp.ClusterID
   279  	}
   280  
   281  	for _, b := range resp.Brokers {
   282  		m.Brokers = append(m.Brokers, kgo.BrokerMetadata{
   283  			NodeID: b.NodeID,
   284  			Host:   b.Host,
   285  			Port:   b.Port,
   286  			Rack:   b.Rack,
   287  		})
   288  	}
   289  	sort.Slice(m.Brokers, func(i, j int) bool { return m.Brokers[i].NodeID < m.Brokers[j].NodeID })
   290  
   291  	if len(topics) > 0 && len(m.Topics) != len(topics) {
   292  		return Metadata{}, fmt.Errorf("metadata returned only %d topics of %d requested", len(m.Topics), len(topics))
   293  	}
   294  
   295  	return m, nil
   296  }
   297  
   298  // ListedOffset contains record offset information.
   299  type ListedOffset struct {
   300  	Topic     string // Topic is the topic this offset is for.
   301  	Partition int32  // Partition is the partition this offset is for.
   302  
   303  	Timestamp   int64 // Timestamp is the millisecond of the offset if listing after a time, otherwise -1.
   304  	Offset      int64 // Offset is the record offset, or -1 if one could not be found.
   305  	LeaderEpoch int32 // LeaderEpoch is the leader epoch at this offset, if any, otherwise -1.
   306  
   307  	Err error // Err is non-nil if the partition has a load error.
   308  }
   309  
   310  // ListedOffsets contains per-partition record offset information that is
   311  // returned from any of the List.*Offsets functions.
   312  type ListedOffsets map[string]map[int32]ListedOffset
   313  
   314  // Lookup returns the offset at t and p and whether it exists.
   315  func (l ListedOffsets) Lookup(t string, p int32) (ListedOffset, bool) {
   316  	if len(l) == 0 {
   317  		return ListedOffset{}, false
   318  	}
   319  	ps := l[t]
   320  	if len(ps) == 0 {
   321  		return ListedOffset{}, false
   322  	}
   323  	o, exists := ps[p]
   324  	return o, exists
   325  }
   326  
   327  // Each calls fn for each listed offset.
   328  func (l ListedOffsets) Each(fn func(ListedOffset)) {
   329  	for _, ps := range l {
   330  		for _, o := range ps {
   331  			fn(o)
   332  		}
   333  	}
   334  }
   335  
   336  // Error iterates over all offsets and returns the first error encountered, if
   337  // any. This can be to check if a listing was entirely successful or not.
   338  //
   339  // Note that offset listing can be partially successful. For example, some
   340  // offsets could succeed to be listed, while other could fail (maybe one
   341  // partition is offline). If this is something you need to worry about, you may
   342  // need to check all offsets manually.
   343  func (l ListedOffsets) Error() error {
   344  	for _, ps := range l {
   345  		for _, o := range ps {
   346  			if o.Err != nil {
   347  				return o.Err
   348  			}
   349  		}
   350  	}
   351  	return nil
   352  }
   353  
   354  // Offsets returns these listed offsets as offsets.
   355  func (l ListedOffsets) Offsets() Offsets {
   356  	o := make(Offsets)
   357  	l.Each(func(l ListedOffset) {
   358  		o.Add(Offset{
   359  			Topic:       l.Topic,
   360  			Partition:   l.Partition,
   361  			At:          l.Offset,
   362  			LeaderEpoch: l.LeaderEpoch,
   363  		})
   364  	})
   365  	return o
   366  }
   367  
   368  // KOffsets returns these listed offsets as a kgo offset map.
   369  func (l ListedOffsets) KOffsets() map[string]map[int32]kgo.Offset {
   370  	return l.Offsets().KOffsets()
   371  }
   372  
   373  // ListStartOffsets returns the start (oldest) offsets for each partition in
   374  // each requested topic. In Kafka terms, this returns the log start offset. If
   375  // no topics are specified, all topics are listed. If a requested topic does
   376  // not exist, no offsets for it are listed and it is not present in the
   377  // response.
   378  //
   379  // If any topics being listed do not exist, a special -1 partition is added
   380  // to the response with the expected error code kerr.UnknownTopicOrPartition.
   381  //
   382  // This may return *ShardErrors.
   383  func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
   384  	return cl.listOffsets(ctx, 0, -2, topics)
   385  }
   386  
   387  // ListEndOffsets returns the end (newest) offsets for each partition in each
   388  // requested topic. In Kafka terms, this returns high watermarks. If no topics
   389  // are specified, all topics are listed. If a requested topic does not exist,
   390  // no offsets for it are listed and it is not present in the response.
   391  //
   392  // If any topics being listed do not exist, a special -1 partition is added
   393  // to the response with the expected error code kerr.UnknownTopicOrPartition.
   394  //
   395  // This may return *ShardErrors.
   396  func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
   397  	return cl.listOffsets(ctx, 0, -1, topics)
   398  }
   399  
   400  // ListCommittedOffsets returns newest committed offsets for each partition in
   401  // each requested topic. A committed offset may be slightly less than the
   402  // latest offset. In Kafka terms, committed means the last stable offset, and
   403  // newest means the high watermark. Record offsets in active, uncommitted
   404  // transactions will not be returned. If no topics are specified, all topics
   405  // are listed. If a requested topic does not exist, no offsets for it are
   406  // listed and it is not present in the response.
   407  //
   408  // If any topics being listed do not exist, a special -1 partition is added
   409  // to the response with the expected error code kerr.UnknownTopicOrPartition.
   410  //
   411  // This may return *ShardErrors.
   412  func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
   413  	return cl.listOffsets(ctx, 1, -1, topics)
   414  }
   415  
   416  // ListOffsetsAfterMilli returns the first offsets after the requested
   417  // millisecond timestamp. Unlike listing start/end/committed offsets, offsets
   418  // returned from this function also include the timestamp of the offset. If no
   419  // topics are specified, all topics are listed. If a partition has no offsets
   420  // after the requested millisecond, the offset will be the current end offset.
   421  // If a requested topic does not exist, no offsets for it are listed and it is
   422  // not present in the response.
   423  //
   424  // If any topics being listed do not exist, a special -1 partition is added
   425  // to the response with the expected error code kerr.UnknownTopicOrPartition.
   426  //
   427  // This may return *ShardErrors.
   428  func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error) {
   429  	return cl.listOffsets(ctx, 0, millisecond, topics)
   430  }
   431  
   432  func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int64, topics []string) (ListedOffsets, error) {
   433  	tds, err := cl.ListTopics(ctx, topics...)
   434  	if err != nil {
   435  		return nil, err
   436  	}
   437  
   438  	// If we request with timestamps, we may request twice: once for after
   439  	// timestamps, and once for any -1 (and no error) offsets where the
   440  	// timestamp is in the future.
   441  	list := make(ListedOffsets)
   442  
   443  	for _, td := range tds {
   444  		if td.Err != nil {
   445  			list[td.Topic] = map[int32]ListedOffset{
   446  				-1: {
   447  					Topic:     td.Topic,
   448  					Partition: -1,
   449  					Err:       td.Err,
   450  				},
   451  			}
   452  		}
   453  	}
   454  	rerequest := make(map[string][]int32)
   455  	shardfn := func(kr kmsg.Response) error {
   456  		resp := kr.(*kmsg.ListOffsetsResponse)
   457  		for _, t := range resp.Topics {
   458  			lt, ok := list[t.Topic]
   459  			if !ok {
   460  				lt = make(map[int32]ListedOffset)
   461  				list[t.Topic] = lt
   462  			}
   463  			for _, p := range t.Partitions {
   464  				if err := maybeAuthErr(p.ErrorCode); err != nil {
   465  					return err
   466  				}
   467  				lt[p.Partition] = ListedOffset{
   468  					Topic:       t.Topic,
   469  					Partition:   p.Partition,
   470  					Timestamp:   p.Timestamp,
   471  					Offset:      p.Offset,
   472  					LeaderEpoch: p.LeaderEpoch,
   473  					Err:         kerr.ErrorForCode(p.ErrorCode),
   474  				}
   475  				if timestamp != -1 && p.Offset == -1 && p.ErrorCode == 0 {
   476  					rerequest[t.Topic] = append(rerequest[t.Topic], p.Partition)
   477  				}
   478  			}
   479  		}
   480  		return nil
   481  	}
   482  
   483  	req := kmsg.NewPtrListOffsetsRequest()
   484  	req.IsolationLevel = isolation
   485  	for t, td := range tds {
   486  		rt := kmsg.NewListOffsetsRequestTopic()
   487  		if td.Err != nil {
   488  			continue
   489  		}
   490  		rt.Topic = t
   491  		for p := range td.Partitions {
   492  			rp := kmsg.NewListOffsetsRequestTopicPartition()
   493  			rp.Partition = p
   494  			rp.Timestamp = timestamp
   495  			rt.Partitions = append(rt.Partitions, rp)
   496  		}
   497  		req.Topics = append(req.Topics, rt)
   498  	}
   499  	shards := cl.cl.RequestSharded(ctx, req)
   500  	err = shardErrEach(req, shards, shardfn)
   501  	if len(rerequest) > 0 {
   502  		req.Topics = req.Topics[:0]
   503  		for t, ps := range rerequest {
   504  			rt := kmsg.NewListOffsetsRequestTopic()
   505  			rt.Topic = t
   506  			for _, p := range ps {
   507  				rp := kmsg.NewListOffsetsRequestTopicPartition()
   508  				rp.Partition = p
   509  				rp.Timestamp = -1 // we always list end offsets when rerequesting
   510  				rt.Partitions = append(rt.Partitions, rp)
   511  			}
   512  			req.Topics = append(req.Topics, rt)
   513  		}
   514  		shards = cl.cl.RequestSharded(ctx, req)
   515  		err = mergeShardErrs(err, shardErrEach(req, shards, shardfn))
   516  	}
   517  	return list, err
   518  }
   519  

View as plain text