1 package kadm
2
3 import (
4 "bytes"
5 "context"
6 "encoding/base64"
7 "fmt"
8 "sort"
9
10 "github.com/twmb/franz-go/pkg/kerr"
11 "github.com/twmb/franz-go/pkg/kgo"
12 "github.com/twmb/franz-go/pkg/kmsg"
13 )
14
15
16 type TopicID [16]byte
17
18
19 func (t TopicID) String() string { return base64.StdEncoding.EncodeToString(t[:]) }
20
21
22 func (t TopicID) MarshalJSON() ([]byte, error) { return []byte(`"` + t.String() + `"`), nil }
23
24
25 func (t TopicID) Less(other TopicID) bool {
26 return bytes.Compare(t[:], other[:]) == -1
27 }
28
29
30
31
32 type PartitionDetail struct {
33 Topic string
34 Partition int32
35
36 Leader int32
37 LeaderEpoch int32
38 Replicas []int32
39 ISR []int32
40 OfflineReplicas []int32
41
42 Err error
43 }
44
45
46
47 type PartitionDetails map[int32]PartitionDetail
48
49
50 func (ds PartitionDetails) Sorted() []PartitionDetail {
51 s := make([]PartitionDetail, 0, len(ds))
52 for _, d := range ds {
53 s = append(s, d)
54 }
55 sort.Slice(s, func(i, j int) bool { return s[i].Partition < s[j].Partition })
56 return s
57 }
58
59
60 func (ds PartitionDetails) Numbers() []int32 {
61 all := make([]int32, 0, len(ds))
62 for p := range ds {
63 all = append(all, p)
64 }
65 return int32s(all)
66 }
67
68
69
70
71
72 func (ds PartitionDetails) NumReplicas() int {
73 for _, p := range ds {
74 return len(p.Replicas)
75 }
76 return 0
77 }
78
79
80
81 type TopicDetail struct {
82 Topic string
83
84 ID TopicID
85 IsInternal bool
86 Partitions PartitionDetails
87
88 Err error
89 }
90
91
92 type TopicDetails map[string]TopicDetail
93
94
95 func (ds TopicDetails) Names() []string {
96 all := make([]string, 0, len(ds))
97 for t := range ds {
98 all = append(all, t)
99 }
100 sort.Strings(all)
101 return all
102 }
103
104
105 func (ds TopicDetails) Sorted() []TopicDetail {
106 s := make([]TopicDetail, 0, len(ds))
107 for _, d := range ds {
108 s = append(s, d)
109 }
110 sort.Slice(s, func(i, j int) bool {
111 if s[i].Topic == "" {
112 if s[j].Topic == "" {
113 return bytes.Compare(s[i].ID[:], s[j].ID[:]) == -1
114 }
115 return true
116 }
117 if s[j].Topic == "" {
118 return false
119 }
120 return s[i].Topic < s[j].Topic
121 })
122 return s
123 }
124
125
126
127 func (ds TopicDetails) Has(topic string) bool {
128 d, ok := ds[topic]
129 return ok && d.Err != kerr.UnknownTopicOrPartition
130 }
131
132
133 func (ds TopicDetails) FilterInternal() {
134 for t, d := range ds {
135 if d.IsInternal {
136 delete(ds, t)
137 }
138 }
139 }
140
141
142 func (ds TopicDetails) EachPartition(fn func(PartitionDetail)) {
143 for _, td := range ds {
144 for _, d := range td.Partitions {
145 fn(d)
146 }
147 }
148 }
149
150
151 func (ds TopicDetails) EachError(fn func(TopicDetail)) {
152 for _, td := range ds {
153 if td.Err != nil {
154 fn(td)
155 }
156 }
157 }
158
159
160
161 func (ds TopicDetails) Error() error {
162 for _, t := range ds {
163 if t.Err != nil {
164 return t.Err
165 }
166 }
167 return nil
168 }
169
170
171 func (ds TopicDetails) TopicsSet() TopicsSet {
172 var s TopicsSet
173 ds.EachPartition(func(d PartitionDetail) {
174 s.Add(d.Topic, d.Partition)
175 })
176 return s
177 }
178
179
180 func (ds TopicDetails) TopicsList() TopicsList {
181 return ds.TopicsSet().Sorted()
182 }
183
184
185 type Metadata struct {
186 Cluster string
187 Controller int32
188 Brokers BrokerDetails
189 Topics TopicDetails
190 }
191
192 func int32s(is []int32) []int32 {
193 sort.Slice(is, func(i, j int) bool { return is[i] < is[j] })
194 return is
195 }
196
197
198
199 func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error) {
200 m, err := cl.Metadata(ctx)
201 if err != nil {
202 return nil, err
203 }
204 return m.Brokers, nil
205 }
206
207
208
209
210
211 func (cl *Client) BrokerMetadata(ctx context.Context) (Metadata, error) {
212 return cl.metadata(ctx, true, nil)
213 }
214
215
216
217
218
219
220 func (cl *Client) Metadata(
221 ctx context.Context,
222 topics ...string,
223 ) (Metadata, error) {
224 return cl.metadata(ctx, false, topics)
225 }
226
227 func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string) (Metadata, error) {
228 req := kmsg.NewPtrMetadataRequest()
229 for _, t := range topics {
230 rt := kmsg.NewMetadataRequestTopic()
231 rt.Topic = kmsg.StringPtr(t)
232 req.Topics = append(req.Topics, rt)
233 }
234 if noTopics {
235 req.Topics = []kmsg.MetadataRequestTopic{}
236 }
237 resp, err := req.RequestWith(ctx, cl.cl)
238 if err != nil {
239 return Metadata{}, err
240 }
241
242 tds := make(map[string]TopicDetail, len(resp.Topics))
243 for _, t := range resp.Topics {
244 if err := maybeAuthErr(t.ErrorCode); err != nil {
245 return Metadata{}, err
246 }
247 td := TopicDetail{
248 ID: t.TopicID,
249 Partitions: make(map[int32]PartitionDetail),
250 IsInternal: t.IsInternal,
251 Err: kerr.ErrorForCode(t.ErrorCode),
252 }
253 if t.Topic != nil {
254 td.Topic = *t.Topic
255 }
256 for _, p := range t.Partitions {
257 td.Partitions[p.Partition] = PartitionDetail{
258 Topic: td.Topic,
259 Partition: p.Partition,
260
261 Leader: p.Leader,
262 LeaderEpoch: p.LeaderEpoch,
263 Replicas: p.Replicas,
264 ISR: p.ISR,
265 OfflineReplicas: p.OfflineReplicas,
266
267 Err: kerr.ErrorForCode(p.ErrorCode),
268 }
269 }
270 tds[*t.Topic] = td
271 }
272
273 m := Metadata{
274 Controller: resp.ControllerID,
275 Topics: tds,
276 }
277 if resp.ClusterID != nil {
278 m.Cluster = *resp.ClusterID
279 }
280
281 for _, b := range resp.Brokers {
282 m.Brokers = append(m.Brokers, kgo.BrokerMetadata{
283 NodeID: b.NodeID,
284 Host: b.Host,
285 Port: b.Port,
286 Rack: b.Rack,
287 })
288 }
289 sort.Slice(m.Brokers, func(i, j int) bool { return m.Brokers[i].NodeID < m.Brokers[j].NodeID })
290
291 if len(topics) > 0 && len(m.Topics) != len(topics) {
292 return Metadata{}, fmt.Errorf("metadata returned only %d topics of %d requested", len(m.Topics), len(topics))
293 }
294
295 return m, nil
296 }
297
298
299 type ListedOffset struct {
300 Topic string
301 Partition int32
302
303 Timestamp int64
304 Offset int64
305 LeaderEpoch int32
306
307 Err error
308 }
309
310
311
312 type ListedOffsets map[string]map[int32]ListedOffset
313
314
315 func (l ListedOffsets) Lookup(t string, p int32) (ListedOffset, bool) {
316 if len(l) == 0 {
317 return ListedOffset{}, false
318 }
319 ps := l[t]
320 if len(ps) == 0 {
321 return ListedOffset{}, false
322 }
323 o, exists := ps[p]
324 return o, exists
325 }
326
327
328 func (l ListedOffsets) Each(fn func(ListedOffset)) {
329 for _, ps := range l {
330 for _, o := range ps {
331 fn(o)
332 }
333 }
334 }
335
336
337
338
339
340
341
342
343 func (l ListedOffsets) Error() error {
344 for _, ps := range l {
345 for _, o := range ps {
346 if o.Err != nil {
347 return o.Err
348 }
349 }
350 }
351 return nil
352 }
353
354
355 func (l ListedOffsets) Offsets() Offsets {
356 o := make(Offsets)
357 l.Each(func(l ListedOffset) {
358 o.Add(Offset{
359 Topic: l.Topic,
360 Partition: l.Partition,
361 At: l.Offset,
362 LeaderEpoch: l.LeaderEpoch,
363 })
364 })
365 return o
366 }
367
368
369 func (l ListedOffsets) KOffsets() map[string]map[int32]kgo.Offset {
370 return l.Offsets().KOffsets()
371 }
372
373
374
375
376
377
378
379
380
381
382
383 func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
384 return cl.listOffsets(ctx, 0, -2, topics)
385 }
386
387
388
389
390
391
392
393
394
395
396 func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
397 return cl.listOffsets(ctx, 0, -1, topics)
398 }
399
400
401
402
403
404
405
406
407
408
409
410
411
412 func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
413 return cl.listOffsets(ctx, 1, -1, topics)
414 }
415
416
417
418
419
420
421
422
423
424
425
426
427
428 func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error) {
429 return cl.listOffsets(ctx, 0, millisecond, topics)
430 }
431
432 func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int64, topics []string) (ListedOffsets, error) {
433 tds, err := cl.ListTopics(ctx, topics...)
434 if err != nil {
435 return nil, err
436 }
437
438
439
440
441 list := make(ListedOffsets)
442
443 for _, td := range tds {
444 if td.Err != nil {
445 list[td.Topic] = map[int32]ListedOffset{
446 -1: {
447 Topic: td.Topic,
448 Partition: -1,
449 Err: td.Err,
450 },
451 }
452 }
453 }
454 rerequest := make(map[string][]int32)
455 shardfn := func(kr kmsg.Response) error {
456 resp := kr.(*kmsg.ListOffsetsResponse)
457 for _, t := range resp.Topics {
458 lt, ok := list[t.Topic]
459 if !ok {
460 lt = make(map[int32]ListedOffset)
461 list[t.Topic] = lt
462 }
463 for _, p := range t.Partitions {
464 if err := maybeAuthErr(p.ErrorCode); err != nil {
465 return err
466 }
467 lt[p.Partition] = ListedOffset{
468 Topic: t.Topic,
469 Partition: p.Partition,
470 Timestamp: p.Timestamp,
471 Offset: p.Offset,
472 LeaderEpoch: p.LeaderEpoch,
473 Err: kerr.ErrorForCode(p.ErrorCode),
474 }
475 if timestamp != -1 && p.Offset == -1 && p.ErrorCode == 0 {
476 rerequest[t.Topic] = append(rerequest[t.Topic], p.Partition)
477 }
478 }
479 }
480 return nil
481 }
482
483 req := kmsg.NewPtrListOffsetsRequest()
484 req.IsolationLevel = isolation
485 for t, td := range tds {
486 rt := kmsg.NewListOffsetsRequestTopic()
487 if td.Err != nil {
488 continue
489 }
490 rt.Topic = t
491 for p := range td.Partitions {
492 rp := kmsg.NewListOffsetsRequestTopicPartition()
493 rp.Partition = p
494 rp.Timestamp = timestamp
495 rt.Partitions = append(rt.Partitions, rp)
496 }
497 req.Topics = append(req.Topics, rt)
498 }
499 shards := cl.cl.RequestSharded(ctx, req)
500 err = shardErrEach(req, shards, shardfn)
501 if len(rerequest) > 0 {
502 req.Topics = req.Topics[:0]
503 for t, ps := range rerequest {
504 rt := kmsg.NewListOffsetsRequestTopic()
505 rt.Topic = t
506 for _, p := range ps {
507 rp := kmsg.NewListOffsetsRequestTopicPartition()
508 rp.Partition = p
509 rp.Timestamp = -1
510 rt.Partitions = append(rt.Partitions, rp)
511 }
512 req.Topics = append(req.Topics, rt)
513 }
514 shards = cl.cl.RequestSharded(ctx, req)
515 err = mergeShardErrs(err, shardErrEach(req, shards, shardfn))
516 }
517 return list, err
518 }
519
View as plain text