1 package kadm
2
3 import (
4 "context"
5 "errors"
6 "sort"
7
8 "github.com/twmb/franz-go/pkg/kerr"
9 "github.com/twmb/franz-go/pkg/kmsg"
10 )
11
12
13
14 type DescribedProducer struct {
15 Leader int32
16 Topic string
17 Partition int32
18 ProducerID int64
19 ProducerEpoch int16
20 LastSequence int32
21 LastTimestamp int64
22 CoordinatorEpoch int32
23 CurrentTxnStartOffset int64
24 }
25
26
27
28
29
30
31
32
33
34
35 func (l *DescribedProducer) Less(r *DescribedProducer) bool {
36 if l.Topic < r.Topic {
37 return true
38 }
39 if l.Topic > r.Topic {
40 return false
41 }
42 if l.Partition < r.Partition {
43 return true
44 }
45 if l.Partition > r.Partition {
46 return false
47 }
48 if l.ProducerID < r.ProducerID {
49 return true
50 }
51 if l.ProducerID > r.ProducerID {
52 return false
53 }
54 if l.ProducerEpoch < r.ProducerEpoch {
55 return true
56 }
57 if l.ProducerEpoch > r.ProducerEpoch {
58 return false
59 }
60 if l.LastTimestamp < r.LastTimestamp {
61 return true
62 }
63 if l.LastTimestamp > r.LastTimestamp {
64 return false
65 }
66 return l.LastSequence < r.LastSequence
67 }
68
69
70 type DescribedProducers map[int64]DescribedProducer
71
72
73
74 func (ds DescribedProducers) Sorted() []DescribedProducer {
75 var all []DescribedProducer
76 for _, d := range ds {
77 all = append(all, d)
78 }
79 sort.Slice(all, func(i, j int) bool {
80 l, r := all[i], all[j]
81 return l.Topic < r.Topic || l.Topic == r.Topic && (l.Partition < r.Partition || l.Partition == r.Partition && l.ProducerID < r.ProducerID)
82 })
83 return all
84 }
85
86
87 func (ds DescribedProducers) Each(fn func(DescribedProducer)) {
88 for _, d := range ds {
89 fn(d)
90 }
91 }
92
93
94 type DescribedProducersPartition struct {
95 Leader int32
96 Topic string
97 Partition int32
98 ActiveProducers DescribedProducers
99 Err error
100 }
101
102
103 type DescribedProducersPartitions map[int32]DescribedProducersPartition
104
105
106 func (ds DescribedProducersPartitions) Sorted() []DescribedProducersPartition {
107 var all []DescribedProducersPartition
108 for _, d := range ds {
109 all = append(all, d)
110 }
111 sort.Slice(all, func(i, j int) bool {
112 l, r := all[i], all[j]
113 return l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
114 })
115 return all
116 }
117
118
119 func (ds DescribedProducersPartitions) SortedProducers() []DescribedProducer {
120 var all []DescribedProducer
121 ds.EachProducer(func(d DescribedProducer) {
122 all = append(all, d)
123 })
124 sort.Slice(all, func(i, j int) bool {
125 l, r := all[i], all[j]
126 return l.Topic < r.Topic || l.Topic == r.Topic && (l.Partition < r.Partition || l.Partition == r.Partition && l.ProducerID < r.ProducerID)
127 })
128 return all
129 }
130
131
132 func (ds DescribedProducersPartitions) Each(fn func(DescribedProducersPartition)) {
133 for _, d := range ds {
134 fn(d)
135 }
136 }
137
138
139 func (ds DescribedProducersPartitions) EachProducer(fn func(DescribedProducer)) {
140 for _, d := range ds {
141 for _, p := range d.ActiveProducers {
142 fn(p)
143 }
144 }
145 }
146
147
148 type DescribedProducersTopic struct {
149 Topic string
150 Partitions DescribedProducersPartitions
151 }
152
153
154 type DescribedProducersTopics map[string]DescribedProducersTopic
155
156
157 func (ds DescribedProducersTopics) Sorted() []DescribedProducersTopic {
158 var all []DescribedProducersTopic
159 ds.Each(func(d DescribedProducersTopic) {
160 all = append(all, d)
161 })
162 sort.Slice(all, func(i, j int) bool {
163 l, r := all[i], all[j]
164 return l.Topic < r.Topic
165 })
166 return all
167 }
168
169
170 func (ds DescribedProducersTopics) SortedPartitions() []DescribedProducersPartition {
171 var all []DescribedProducersPartition
172 ds.EachPartition(func(d DescribedProducersPartition) {
173 all = append(all, d)
174 })
175 sort.Slice(all, func(i, j int) bool {
176 l, r := all[i], all[j]
177 return l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
178 })
179 return all
180 }
181
182
183 func (ds DescribedProducersTopics) SortedProducers() []DescribedProducer {
184 var all []DescribedProducer
185 ds.EachProducer(func(d DescribedProducer) {
186 all = append(all, d)
187 })
188 sort.Slice(all, func(i, j int) bool {
189 l, r := all[i], all[j]
190 return l.Topic < r.Topic || l.Topic == r.Topic && (l.Partition < r.Partition || l.Partition == r.Partition && l.ProducerID < r.ProducerID)
191 })
192 return all
193 }
194
195
196 func (ds DescribedProducersTopics) Each(fn func(DescribedProducersTopic)) {
197 for _, d := range ds {
198 fn(d)
199 }
200 }
201
202
203 func (ds DescribedProducersTopics) EachPartition(fn func(DescribedProducersPartition)) {
204 for _, d := range ds {
205 for _, p := range d.Partitions {
206 fn(p)
207 }
208 }
209 }
210
211
212 func (ds DescribedProducersTopics) EachProducer(fn func(DescribedProducer)) {
213 for _, d := range ds {
214 for _, p := range d.Partitions {
215 for _, b := range p.ActiveProducers {
216 fn(b)
217 }
218 }
219 }
220 }
221
222
223
224
225
226
227
228 func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (DescribedProducersTopics, error) {
229 if len(s) == 0 {
230 m, err := cl.Metadata(ctx)
231 if err != nil {
232 return nil, err
233 }
234 s = m.Topics.TopicsSet()
235 } else if e := s.EmptyTopics(); len(e) > 0 {
236 m, err := cl.Metadata(ctx, e...)
237 if err != nil {
238 return nil, err
239 }
240 for t, ps := range m.Topics.TopicsSet() {
241 s[t] = ps
242 }
243 }
244
245 req := kmsg.NewPtrDescribeProducersRequest()
246 for _, t := range s.IntoList() {
247 rt := kmsg.NewDescribeProducersRequestTopic()
248 rt.Topic = t.Topic
249 rt.Partitions = t.Partitions
250 req.Topics = append(req.Topics, rt)
251 }
252 shards := cl.cl.RequestSharded(ctx, req)
253 dts := make(DescribedProducersTopics)
254 return dts, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
255 resp := kr.(*kmsg.DescribeProducersResponse)
256 for _, rt := range resp.Topics {
257 dt, exists := dts[rt.Topic]
258 if !exists {
259 dt = DescribedProducersTopic{
260 Topic: rt.Topic,
261 Partitions: make(DescribedProducersPartitions),
262 }
263 dts[rt.Topic] = dt
264 }
265 dps := dt.Partitions
266 for _, rp := range rt.Partitions {
267 if err := maybeAuthErr(rp.ErrorCode); err != nil {
268 return err
269 }
270 drs := make(DescribedProducers)
271 dp := DescribedProducersPartition{
272 Leader: b.NodeID,
273 Topic: rt.Topic,
274 Partition: rp.Partition,
275 ActiveProducers: drs,
276 Err: kerr.ErrorForCode(rp.ErrorCode),
277 }
278 dps[rp.Partition] = dp
279 for _, rr := range rp.ActiveProducers {
280 dr := DescribedProducer{
281 Leader: b.NodeID,
282 Topic: rt.Topic,
283 Partition: rp.Partition,
284 ProducerID: rr.ProducerID,
285 ProducerEpoch: int16(rr.ProducerEpoch),
286 LastSequence: rr.LastSequence,
287 LastTimestamp: rr.LastTimestamp,
288 CoordinatorEpoch: rr.CoordinatorEpoch,
289 CurrentTxnStartOffset: rr.CurrentTxnStartOffset,
290 }
291 drs[dr.ProducerID] = dr
292 }
293 }
294 }
295 return nil
296 })
297 }
298
299
300
301 type DescribedTransaction struct {
302 Coordinator int32
303 TxnID string
304 State string
305 TimeoutMillis int32
306 StartTimestamp int64
307 ProducerID int64
308 ProducerEpoch int16
309
310
311
312
313
314 Topics TopicsSet
315
316 Err error
317 }
318
319
320
321 type DescribedTransactions map[string]DescribedTransaction
322
323
324 func (ds DescribedTransactions) Sorted() []DescribedTransaction {
325 s := make([]DescribedTransaction, 0, len(ds))
326 for _, d := range ds {
327 s = append(s, d)
328 }
329 sort.Slice(s, func(i, j int) bool { return s[i].TxnID < s[j].TxnID })
330 return s
331 }
332
333
334 func (ds DescribedTransactions) Each(fn func(DescribedTransaction)) {
335 for _, d := range ds {
336 fn(d)
337 }
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351 func (rs DescribedTransactions) On(txnID string, fn func(*DescribedTransaction) error) (DescribedTransaction, error) {
352 if len(rs) > 0 {
353 r, ok := rs[txnID]
354 if ok {
355 if fn == nil {
356 return r, nil
357 }
358 return r, fn(&r)
359 }
360 }
361 return DescribedTransaction{}, kerr.TransactionalIDNotFound
362 }
363
364
365 func (ds DescribedTransactions) TransactionalIDs() []string {
366 all := make([]string, 0, len(ds))
367 for t := range ds {
368 all = append(all, t)
369 }
370 sort.Strings(all)
371 return all
372 }
373
374
375
376
377
378
379
380
381
382
383
384
385
386 func (cl *Client) DescribeTransactions(ctx context.Context, txnIDs ...string) (DescribedTransactions, error) {
387 var seList *ShardErrors
388 if len(txnIDs) == 0 {
389 listed, err := cl.ListTransactions(ctx, nil, nil)
390 switch {
391 case err == nil:
392 case errors.As(err, &seList):
393 default:
394 return nil, err
395 }
396 txnIDs = listed.TransactionalIDs()
397 if len(txnIDs) == 0 {
398 return nil, err
399 }
400 }
401
402 req := kmsg.NewPtrDescribeTransactionsRequest()
403 req.TransactionalIDs = txnIDs
404
405 shards := cl.cl.RequestSharded(ctx, req)
406 described := make(DescribedTransactions)
407 err := shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
408 resp := kr.(*kmsg.DescribeTransactionsResponse)
409 for _, rt := range resp.TransactionStates {
410 if err := maybeAuthErr(rt.ErrorCode); err != nil {
411 return err
412 }
413 t := DescribedTransaction{
414 Coordinator: b.NodeID,
415 TxnID: rt.TransactionalID,
416 State: rt.State,
417 TimeoutMillis: rt.TimeoutMillis,
418 StartTimestamp: rt.StartTimestamp,
419 ProducerID: rt.ProducerID,
420 ProducerEpoch: rt.ProducerEpoch,
421 Err: kerr.ErrorForCode(rt.ErrorCode),
422 }
423 for _, rtt := range rt.Topics {
424 t.Topics.Add(rtt.Topic, rtt.Partitions...)
425 }
426 described[t.TxnID] = t
427 }
428 return nil
429 })
430
431 var seDesc *ShardErrors
432 switch {
433 case err == nil:
434 return described, seList.into()
435 case errors.As(err, &seDesc):
436 if seList != nil {
437 seDesc.Errs = append(seList.Errs, seDesc.Errs...)
438 }
439 return described, seDesc.into()
440 default:
441 return nil, err
442 }
443 }
444
445
446
447 type ListedTransaction struct {
448 Coordinator int32
449 TxnID string
450 ProducerID int64
451 State string
452 }
453
454
455 type ListedTransactions map[string]ListedTransaction
456
457
458 func (ls ListedTransactions) Sorted() []ListedTransaction {
459 s := make([]ListedTransaction, 0, len(ls))
460 for _, l := range ls {
461 s = append(s, l)
462 }
463 sort.Slice(s, func(i, j int) bool { return s[i].TxnID < s[j].TxnID })
464 return s
465 }
466
467
468 func (ls ListedTransactions) Each(fn func(ListedTransaction)) {
469 for _, l := range ls {
470 fn(l)
471 }
472 }
473
474
475 func (ls ListedTransactions) TransactionalIDs() []string {
476 all := make([]string, 0, len(ls))
477 for t := range ls {
478 all = append(all, t)
479 }
480 sort.Strings(all)
481 return all
482 }
483
484
485
486
487
488
489
490
491 func (cl *Client) ListTransactions(ctx context.Context, producerIDs []int64, filterStates []string) (ListedTransactions, error) {
492 req := kmsg.NewPtrListTransactionsRequest()
493 req.ProducerIDFilters = producerIDs
494 req.StateFilters = filterStates
495 shards := cl.cl.RequestSharded(ctx, req)
496 list := make(ListedTransactions)
497 return list, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
498 resp := kr.(*kmsg.ListTransactionsResponse)
499 if err := maybeAuthErr(resp.ErrorCode); err != nil {
500 return err
501 }
502 if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
503 return err
504 }
505 for _, t := range resp.TransactionStates {
506 list[t.TransactionalID] = ListedTransaction{
507 Coordinator: b.NodeID,
508 TxnID: t.TransactionalID,
509 ProducerID: t.ProducerID,
510 State: t.TransactionState,
511 }
512 }
513 return nil
514 })
515 }
516
517
518
519
520
521
522
523
524 type TxnMarkers struct {
525 ProducerID int64
526 ProducerEpoch int16
527 Commit bool
528 CoordinatorEpoch int32
529 Topics TopicsSet
530 }
531
532
533
534 type TxnMarkersPartitionResponse struct {
535 NodeID int32
536 ProducerID int64
537 Topic string
538 Partition int32
539 Err error
540 }
541
542
543
544 type TxnMarkersPartitionResponses map[int32]TxnMarkersPartitionResponse
545
546
547 func (ps TxnMarkersPartitionResponses) Sorted() []TxnMarkersPartitionResponse {
548 var all []TxnMarkersPartitionResponse
549 ps.Each(func(p TxnMarkersPartitionResponse) {
550 all = append(all, p)
551 })
552 sort.Slice(all, func(i, j int) bool {
553 l, r := all[i], all[j]
554 return l.Partition < r.Partition
555 })
556 return all
557 }
558
559
560 func (ps TxnMarkersPartitionResponses) Each(fn func(TxnMarkersPartitionResponse)) {
561 for _, p := range ps {
562 fn(p)
563 }
564 }
565
566
567
568 type TxnMarkersTopicResponse struct {
569 ProducerID int64
570 Topic string
571 Partitions TxnMarkersPartitionResponses
572 }
573
574
575
576 type TxnMarkersTopicResponses map[string]TxnMarkersTopicResponse
577
578
579 func (ts TxnMarkersTopicResponses) Sorted() []TxnMarkersTopicResponse {
580 var all []TxnMarkersTopicResponse
581 ts.Each(func(t TxnMarkersTopicResponse) {
582 all = append(all, t)
583 })
584 sort.Slice(all, func(i, j int) bool {
585 l, r := all[i], all[j]
586 return l.Topic < r.Topic
587 })
588 return all
589 }
590
591
592 func (ts TxnMarkersTopicResponses) SortedPartitions() []TxnMarkersPartitionResponse {
593 var all []TxnMarkersPartitionResponse
594 ts.EachPartition(func(p TxnMarkersPartitionResponse) {
595 all = append(all, p)
596 })
597 sort.Slice(all, func(i, j int) bool {
598 l, r := all[i], all[j]
599 return l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
600 })
601 return all
602 }
603
604
605 func (ts TxnMarkersTopicResponses) Each(fn func(TxnMarkersTopicResponse)) {
606 for _, t := range ts {
607 fn(t)
608 }
609 }
610
611
612 func (ts TxnMarkersTopicResponses) EachPartition(fn func(TxnMarkersPartitionResponse)) {
613 for _, t := range ts {
614 for _, p := range t.Partitions {
615 fn(p)
616 }
617 }
618 }
619
620
621 type TxnMarkersResponse struct {
622 ProducerID int64
623 Topics TxnMarkersTopicResponses
624 }
625
626
627
628 type TxnMarkersResponses map[int64]TxnMarkersResponse
629
630
631 func (ms TxnMarkersResponses) Sorted() []TxnMarkersResponse {
632 var all []TxnMarkersResponse
633 ms.Each(func(m TxnMarkersResponse) {
634 all = append(all, m)
635 })
636 sort.Slice(all, func(i, j int) bool {
637 l, r := all[i], all[j]
638 return l.ProducerID < r.ProducerID
639 })
640 return all
641 }
642
643
644 func (ms TxnMarkersResponses) SortedTopics() []TxnMarkersTopicResponse {
645 var all []TxnMarkersTopicResponse
646 ms.EachTopic(func(t TxnMarkersTopicResponse) {
647 all = append(all, t)
648 })
649 sort.Slice(all, func(i, j int) bool {
650 l, r := all[i], all[j]
651 return l.ProducerID < r.ProducerID || l.ProducerID == r.ProducerID && l.Topic < r.Topic
652 })
653 return all
654 }
655
656
657
658 func (ms TxnMarkersResponses) SortedPartitions() []TxnMarkersPartitionResponse {
659 var all []TxnMarkersPartitionResponse
660 ms.EachPartition(func(p TxnMarkersPartitionResponse) {
661 all = append(all, p)
662 })
663 sort.Slice(all, func(i, j int) bool {
664 l, r := all[i], all[j]
665 return l.ProducerID < r.ProducerID || l.ProducerID == r.ProducerID && l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
666 })
667 return all
668 }
669
670
671 func (ms TxnMarkersResponses) Each(fn func(TxnMarkersResponse)) {
672 for _, m := range ms {
673 fn(m)
674 }
675 }
676
677
678 func (ms TxnMarkersResponses) EachTopic(fn func(TxnMarkersTopicResponse)) {
679 for _, m := range ms {
680 for _, t := range m.Topics {
681 fn(t)
682 }
683 }
684 }
685
686
687
688 func (ms TxnMarkersResponses) EachPartition(fn func(TxnMarkersPartitionResponse)) {
689 for _, m := range ms {
690 for _, t := range m.Topics {
691 for _, p := range t.Partitions {
692 fn(p)
693 }
694 }
695 }
696 }
697
698
699
700
701
702 func (cl *Client) WriteTxnMarkers(ctx context.Context, markers ...TxnMarkers) (TxnMarkersResponses, error) {
703 req := kmsg.NewPtrWriteTxnMarkersRequest()
704 for _, m := range markers {
705 rm := kmsg.NewWriteTxnMarkersRequestMarker()
706 rm.ProducerID = m.ProducerID
707 rm.ProducerEpoch = m.ProducerEpoch
708 rm.Committed = m.Commit
709 rm.CoordinatorEpoch = m.CoordinatorEpoch
710 for t, ps := range m.Topics {
711 rt := kmsg.NewWriteTxnMarkersRequestMarkerTopic()
712 rt.Topic = t
713 for p := range ps {
714 rt.Partitions = append(rt.Partitions, p)
715 }
716 rm.Topics = append(rm.Topics, rt)
717 }
718 req.Markers = append(req.Markers, rm)
719 }
720 shards := cl.cl.RequestSharded(ctx, req)
721 rs := make(TxnMarkersResponses)
722 return rs, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
723 resp := kr.(*kmsg.WriteTxnMarkersResponse)
724 for _, rm := range resp.Markers {
725 m, exists := rs[rm.ProducerID]
726 if !exists {
727 m = TxnMarkersResponse{
728 ProducerID: rm.ProducerID,
729 Topics: make(TxnMarkersTopicResponses),
730 }
731 rs[rm.ProducerID] = m
732 }
733 for _, rt := range rm.Topics {
734 t, exists := m.Topics[rt.Topic]
735 if !exists {
736 t = TxnMarkersTopicResponse{
737 ProducerID: rm.ProducerID,
738 Topic: rt.Topic,
739 Partitions: make(TxnMarkersPartitionResponses),
740 }
741 m.Topics[rt.Topic] = t
742 }
743 for _, rp := range rt.Partitions {
744 if err := maybeAuthErr(rp.ErrorCode); err != nil {
745 return err
746 }
747 t.Partitions[rp.Partition] = TxnMarkersPartitionResponse{
748 NodeID: b.NodeID,
749 ProducerID: rm.ProducerID,
750 Topic: rt.Topic,
751 Partition: rp.Partition,
752 Err: kerr.ErrorForCode(rp.ErrorCode),
753 }
754 }
755 }
756 }
757 return nil
758 })
759 }
760
View as plain text