1 package kadm
2
3 import (
4 "context"
5 "crypto/rand"
6 "crypto/sha256"
7 "crypto/sha512"
8 "fmt"
9 "sort"
10 "strings"
11 "sync"
12
13 "golang.org/x/crypto/pbkdf2"
14
15 "github.com/twmb/franz-go/pkg/kerr"
16 "github.com/twmb/franz-go/pkg/kmsg"
17 "github.com/twmb/franz-go/pkg/kversion"
18 )
19
20
21
22 type FindCoordinatorResponse struct {
23 Name string
24 NodeID int32
25 Host string
26 Port int32
27 Err error
28 ErrMessage string
29 }
30
31
32
33 type FindCoordinatorResponses map[string]FindCoordinatorResponse
34
35
36 func (rs FindCoordinatorResponses) AllFailed() bool {
37 var n int
38 rs.EachError(func(FindCoordinatorResponse) { n++ })
39 return len(rs) > 0 && n == len(rs)
40 }
41
42
43 func (rs FindCoordinatorResponses) Sorted() []FindCoordinatorResponse {
44 s := make([]FindCoordinatorResponse, 0, len(rs))
45 for _, r := range rs {
46 s = append(s, r)
47 }
48 sort.Slice(s, func(i, j int) bool { return s[i].Name < s[j].Name })
49 return s
50 }
51
52
53 func (rs FindCoordinatorResponses) EachError(fn func(FindCoordinatorResponse)) {
54 for _, r := range rs {
55 if r.Err != nil {
56 fn(r)
57 }
58 }
59 }
60
61
62 func (rs FindCoordinatorResponses) Each(fn func(FindCoordinatorResponse)) {
63 for _, r := range rs {
64 fn(r)
65 }
66 }
67
68
69
70 func (rs FindCoordinatorResponses) Error() error {
71 for _, r := range rs {
72 if r.Err != nil {
73 return r.Err
74 }
75 }
76 return nil
77 }
78
79
80
81 func (rs FindCoordinatorResponses) Ok() bool {
82 return rs.Error() == nil
83 }
84
85
86
87
88 func (cl *Client) FindGroupCoordinators(ctx context.Context, groups ...string) FindCoordinatorResponses {
89 return cl.findCoordinators(ctx, 0, groups...)
90 }
91
92
93
94
95
96 func (cl *Client) FindTxnCoordinators(ctx context.Context, txnIDs ...string) FindCoordinatorResponses {
97 return cl.findCoordinators(ctx, 1, txnIDs...)
98 }
99
100 func (cl *Client) findCoordinators(ctx context.Context, kind int8, names ...string) FindCoordinatorResponses {
101 resps := make(FindCoordinatorResponses)
102 if len(names) == 0 {
103 return resps
104 }
105
106 req := kmsg.NewPtrFindCoordinatorRequest()
107 req.CoordinatorType = kind
108 req.CoordinatorKeys = names
109
110 keyErr := func(k string, err error) {
111 resps[k] = FindCoordinatorResponse{
112 Name: k,
113 Err: err,
114 }
115 }
116 allKeysErr := func(req *kmsg.FindCoordinatorRequest, err error) {
117 for _, k := range req.CoordinatorKeys {
118 keyErr(k, err)
119 }
120 }
121
122 shards := cl.cl.RequestSharded(ctx, req)
123 for _, shard := range shards {
124 req := shard.Req.(*kmsg.FindCoordinatorRequest)
125 if shard.Err != nil {
126 allKeysErr(req, shard.Err)
127 continue
128 }
129 resp := shard.Resp.(*kmsg.FindCoordinatorResponse)
130 if err := maybeAuthErr(resp.ErrorCode); err != nil {
131 allKeysErr(req, err)
132 continue
133 }
134 for _, c := range resp.Coordinators {
135 if err := maybeAuthErr(c.ErrorCode); err != nil {
136 keyErr(c.Key, err)
137 continue
138 }
139 resps[c.Key] = FindCoordinatorResponse{
140 Name: c.Key,
141 NodeID: c.NodeID,
142 Host: c.Host,
143 Port: c.Port,
144 Err: kerr.ErrorForCode(c.ErrorCode),
145 ErrMessage: unptrStr(c.ErrorMessage),
146 }
147 }
148 }
149 return resps
150 }
151
152 type minmax struct {
153 min, max int16
154 }
155
156
157 type BrokerApiVersions struct {
158 NodeID int32
159
160 raw *kmsg.ApiVersionsResponse
161 keyVersions map[int16]minmax
162
163 Err error
164 }
165
166
167 func (v *BrokerApiVersions) Raw() *kmsg.ApiVersionsResponse {
168 return v.raw
169 }
170
171
172
173 func (v *BrokerApiVersions) KeyVersions(key int16) (min, max int16, exists bool) {
174 vs, exists := v.keyVersions[key]
175 return vs.min, vs.max, exists
176 }
177
178
179
180 func (v *BrokerApiVersions) KeyMinVersion(key int16) (min int16, exists bool) {
181 min, _, exists = v.KeyVersions(key)
182 return min, exists
183 }
184
185
186
187 func (v *BrokerApiVersions) KeyMaxVersion(key int16) (max int16, exists bool) {
188 _, max, exists = v.KeyVersions(key)
189 return max, exists
190 }
191
192
193
194 func (v *BrokerApiVersions) EachKeySorted(fn func(key, min, max int16)) {
195 type kmm struct {
196 k, min, max int16
197 }
198 kmms := make([]kmm, 0, len(v.keyVersions))
199 for key, minmax := range v.keyVersions {
200 kmms = append(kmms, kmm{key, minmax.min, minmax.max})
201 }
202 sort.Slice(kmms, func(i, j int) bool { return kmms[i].k < kmms[j].k })
203 for _, kmm := range kmms {
204 fn(kmm.k, kmm.min, kmm.max)
205 }
206 }
207
208
209
210
211
212
213
214 func (v *BrokerApiVersions) VersionGuess(opt ...kversion.VersionGuessOpt) string {
215 return kversion.FromApiVersionsResponse(v.raw).VersionGuess(opt...)
216 }
217
218
219
220 type BrokersApiVersions map[int32]BrokerApiVersions
221
222
223 func (vs BrokersApiVersions) Sorted() []BrokerApiVersions {
224 s := make([]BrokerApiVersions, 0, len(vs))
225 for _, v := range vs {
226 s = append(s, v)
227 }
228 sort.Slice(s, func(i, j int) bool { return s[i].NodeID < s[j].NodeID })
229 return s
230 }
231
232
233 func (vs BrokersApiVersions) Each(fn func(BrokerApiVersions)) {
234 for _, v := range vs {
235 fn(v)
236 }
237 }
238
239
240
241 func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error) {
242 m, err := cl.BrokerMetadata(ctx)
243 if err != nil {
244 return nil, err
245 }
246
247 var mu sync.Mutex
248 var wg sync.WaitGroup
249 vs := make(BrokersApiVersions, len(m.Brokers))
250 for _, n := range m.Brokers.NodeIDs() {
251 n := n
252 wg.Add(1)
253 go func() {
254 defer wg.Done()
255 req := kmsg.NewPtrApiVersionsRequest()
256 req.ClientSoftwareName = "kadm"
257 req.ClientSoftwareVersion = softwareVersion()
258 v := BrokerApiVersions{NodeID: n, keyVersions: make(map[int16]minmax)}
259 v.raw, v.Err = req.RequestWith(ctx, cl.cl.Broker(int(n)))
260
261 mu.Lock()
262 defer mu.Unlock()
263 defer func() { vs[n] = v }()
264 if v.Err != nil {
265 return
266 }
267
268 v.Err = kerr.ErrorForCode(v.raw.ErrorCode)
269 for _, k := range v.raw.ApiKeys {
270 v.keyVersions[k.ApiKey] = minmax{
271 min: k.MinVersion,
272 max: k.MaxVersion,
273 }
274 }
275 }()
276 }
277 wg.Wait()
278
279 return vs, nil
280 }
281
282
283 type ClientQuotaEntityComponent struct {
284 Type string
285 Name *string
286 }
287
288
289 func (d ClientQuotaEntityComponent) String() string {
290 if d.Name == nil {
291 return d.Type + "=<default>"
292 }
293 return fmt.Sprintf("%s=%s", d.Type, *d.Name)
294 }
295
296
297 type ClientQuotaEntity []ClientQuotaEntityComponent
298
299
300
301 func (ds ClientQuotaEntity) String() string {
302 var ss []string
303 for _, d := range ds {
304 ss = append(ss, d.String())
305 }
306 return "{" + strings.Join(ss, ", ") + "}"
307 }
308
309
310 type ClientQuotaValue struct {
311 Key string
312 Value float64
313 }
314
315
316 func (d ClientQuotaValue) String() string {
317 return fmt.Sprintf("%s=%f", d.Key, d.Value)
318 }
319
320
321 type ClientQuotaValues []ClientQuotaValue
322
323
324
325
326
327
328
329
330
331
332
333 type QuotasMatchType = kmsg.QuotasMatchType
334
335
336
337
338 type DescribeClientQuotaComponent struct {
339 Type string
340 MatchName *string
341 MatchType QuotasMatchType
342 }
343
344
345
346
347 type DescribedClientQuota struct {
348 Entity ClientQuotaEntity
349 Values ClientQuotaValues
350 }
351
352
353 type DescribedClientQuotas []DescribedClientQuota
354
355
356
357 func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, entityComponents []DescribeClientQuotaComponent) (DescribedClientQuotas, error) {
358 req := kmsg.NewPtrDescribeClientQuotasRequest()
359 req.Strict = strict
360 for _, entity := range entityComponents {
361 rc := kmsg.NewDescribeClientQuotasRequestComponent()
362 rc.EntityType = entity.Type
363 rc.Match = entity.MatchName
364 rc.MatchType = entity.MatchType
365 req.Components = append(req.Components, rc)
366 }
367 resp, err := req.RequestWith(ctx, cl.cl)
368 if err != nil {
369 return nil, err
370 }
371 if err := maybeAuthErr(resp.ErrorCode); err != nil {
372 return nil, err
373 }
374 if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
375 return nil, err
376 }
377 var qs DescribedClientQuotas
378 for _, entry := range resp.Entries {
379 var q DescribedClientQuota
380 for _, e := range entry.Entity {
381 q.Entity = append(q.Entity, ClientQuotaEntityComponent{
382 Type: e.Type,
383 Name: e.Name,
384 })
385 }
386 for _, v := range entry.Values {
387 q.Values = append(q.Values, ClientQuotaValue{
388 Key: v.Key,
389 Value: v.Value,
390 })
391 }
392 qs = append(qs, q)
393 }
394 return qs, nil
395 }
396
397
398 type AlterClientQuotaOp struct {
399 Key string
400 Value float64
401 Remove bool
402 }
403
404
405 type AlterClientQuotaEntry struct {
406 Entity ClientQuotaEntity
407 Ops []AlterClientQuotaOp
408 }
409
410
411 type AlteredClientQuota struct {
412 Entity ClientQuotaEntity
413 Err error
414 ErrMessage string
415 }
416
417
418 type AlteredClientQuotas []AlteredClientQuota
419
420
421
422 func (cl *Client) AlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) {
423 return cl.alterClientQuotas(ctx, false, entries)
424 }
425
426
427
428
429 func (cl *Client) ValidateAlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) {
430 return cl.alterClientQuotas(ctx, true, entries)
431 }
432
433 func (cl *Client) alterClientQuotas(ctx context.Context, validate bool, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) {
434 req := kmsg.NewPtrAlterClientQuotasRequest()
435 req.ValidateOnly = validate
436 for _, entry := range entries {
437 re := kmsg.NewAlterClientQuotasRequestEntry()
438 for _, c := range entry.Entity {
439 rec := kmsg.NewAlterClientQuotasRequestEntryEntity()
440 rec.Type = c.Type
441 rec.Name = c.Name
442 re.Entity = append(re.Entity, rec)
443 }
444 for _, op := range entry.Ops {
445 reo := kmsg.NewAlterClientQuotasRequestEntryOp()
446 reo.Key = op.Key
447 reo.Value = op.Value
448 reo.Remove = op.Remove
449 re.Ops = append(re.Ops, reo)
450 }
451 req.Entries = append(req.Entries, re)
452 }
453 resp, err := req.RequestWith(ctx, cl.cl)
454 if err != nil {
455 return nil, err
456 }
457 var as AlteredClientQuotas
458 for _, entry := range resp.Entries {
459 var e ClientQuotaEntity
460 for _, c := range entry.Entity {
461 e = append(e, ClientQuotaEntityComponent{
462 Type: c.Type,
463 Name: c.Name,
464 })
465 }
466 a := AlteredClientQuota{
467 Entity: e,
468 Err: kerr.ErrorForCode(entry.ErrorCode),
469 ErrMessage: unptrStr(entry.ErrorMessage),
470 }
471 as = append(as, a)
472 }
473 return as, nil
474 }
475
476
477 type ScramMechanism int8
478
479 const (
480
481 ScramSha256 ScramMechanism = 1
482
483 ScramSha512 ScramMechanism = 2
484 )
485
486
487 func (s ScramMechanism) String() string {
488 switch s {
489 case ScramSha256:
490 return "SCRAM-SHA-256"
491 case ScramSha512:
492 return "SCRAM-SHA-512"
493 default:
494 return "UNKNOWN"
495 }
496 }
497
498
499 type CredInfo struct {
500
501
502 Mechanism ScramMechanism
503
504 Iterations int32
505 }
506
507
508 func (c CredInfo) String() string {
509 return fmt.Sprintf("%s=iterations=%d", c.Mechanism, c.Iterations)
510 }
511
512
513
514 type DescribedUserSCRAM struct {
515 User string
516 CredInfos []CredInfo
517 Err error
518 ErrMessage string
519 }
520
521
522 type DescribedUserSCRAMs map[string]DescribedUserSCRAM
523
524
525 func (ds DescribedUserSCRAMs) Sorted() []DescribedUserSCRAM {
526 s := make([]DescribedUserSCRAM, 0, len(ds))
527 for _, d := range ds {
528 s = append(s, d)
529 }
530 sort.Slice(s, func(i, j int) bool { return s[i].User < s[j].User })
531 return s
532 }
533
534
535 func (ds DescribedUserSCRAMs) AllFailed() bool {
536 var n int
537 ds.EachError(func(DescribedUserSCRAM) { n++ })
538 return len(ds) > 0 && n == len(ds)
539 }
540
541
542 func (ds DescribedUserSCRAMs) EachError(fn func(DescribedUserSCRAM)) {
543 for _, d := range ds {
544 if d.Err != nil {
545 fn(d)
546 }
547 }
548 }
549
550
551 func (ds DescribedUserSCRAMs) Each(fn func(DescribedUserSCRAM)) {
552 for _, d := range ds {
553 fn(d)
554 }
555 }
556
557
558
559 func (ds DescribedUserSCRAMs) Error() error {
560 for _, d := range ds {
561 if d.Err != nil {
562 return d.Err
563 }
564 }
565 return nil
566 }
567
568
569
570 func (ds DescribedUserSCRAMs) Ok() bool {
571 return ds.Error() == nil
572 }
573
574
575
576
577 func (cl *Client) DescribeUserSCRAMs(ctx context.Context, users ...string) (DescribedUserSCRAMs, error) {
578 req := kmsg.NewPtrDescribeUserSCRAMCredentialsRequest()
579 for _, u := range users {
580 ru := kmsg.NewDescribeUserSCRAMCredentialsRequestUser()
581 ru.Name = u
582 req.Users = append(req.Users, ru)
583 }
584 resp, err := req.RequestWith(ctx, cl.cl)
585 if err != nil {
586 return nil, err
587 }
588 if err := maybeAuthErr(resp.ErrorCode); err != nil {
589 return nil, err
590 }
591 if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
592 return nil, err
593 }
594 rs := make(DescribedUserSCRAMs)
595 for _, res := range resp.Results {
596 r := DescribedUserSCRAM{
597 User: res.User,
598 Err: kerr.ErrorForCode(res.ErrorCode),
599 ErrMessage: unptrStr(res.ErrorMessage),
600 }
601 for _, i := range res.CredentialInfos {
602 r.CredInfos = append(r.CredInfos, CredInfo{
603 Mechanism: ScramMechanism(i.Mechanism),
604 Iterations: i.Iterations,
605 })
606 }
607 rs[r.User] = r
608 }
609 return rs, nil
610 }
611
612
613 type DeleteSCRAM struct {
614 User string
615 Mechanism ScramMechanism
616 }
617
618
619
620
621
622
623 type UpsertSCRAM struct {
624 User string
625 Mechanism ScramMechanism
626 Iterations int32
627 Password string
628 Salt []byte
629 SaltedPassword []byte
630 }
631
632
633 type AlteredUserSCRAM struct {
634 User string
635 Err error
636 ErrMessage string
637 }
638
639
640 type AlteredUserSCRAMs map[string]AlteredUserSCRAM
641
642
643 func (as AlteredUserSCRAMs) Sorted() []AlteredUserSCRAM {
644 s := make([]AlteredUserSCRAM, 0, len(as))
645 for _, a := range as {
646 s = append(s, a)
647 }
648 sort.Slice(s, func(i, j int) bool { return s[i].User < s[j].User })
649 return s
650 }
651
652
653 func (as AlteredUserSCRAMs) AllFailed() bool {
654 var n int
655 as.EachError(func(AlteredUserSCRAM) { n++ })
656 return len(as) > 0 && n == len(as)
657 }
658
659
660 func (as AlteredUserSCRAMs) EachError(fn func(AlteredUserSCRAM)) {
661 for _, a := range as {
662 if a.Err != nil {
663 fn(a)
664 }
665 }
666 }
667
668
669 func (as AlteredUserSCRAMs) Each(fn func(AlteredUserSCRAM)) {
670 for _, a := range as {
671 fn(a)
672 }
673 }
674
675
676
677 func (as AlteredUserSCRAMs) Error() error {
678 for _, a := range as {
679 if a.Err != nil {
680 return a.Err
681 }
682 }
683 return nil
684 }
685
686
687
688 func (as AlteredUserSCRAMs) Ok() bool {
689 return as.Error() == nil
690 }
691
692
693
694
695
696 func (cl *Client) AlterUserSCRAMs(ctx context.Context, del []DeleteSCRAM, upsert []UpsertSCRAM) (AlteredUserSCRAMs, error) {
697 for i, u := range upsert {
698 if u.Password != "" {
699 if len(u.Salt) > 0 || len(u.SaltedPassword) > 0 {
700 return nil, fmt.Errorf("user %s: cannot specify both a password and a salt / salted password", u.User)
701 }
702 u.Salt = make([]byte, 24)
703 if _, err := rand.Read(u.Salt); err != nil {
704 return nil, fmt.Errorf("user %s: unable to generate salt: %v", u.User, err)
705 }
706 switch u.Mechanism {
707 case ScramSha256:
708 u.SaltedPassword = pbkdf2.Key([]byte(u.Password), u.Salt, int(u.Iterations), sha256.Size, sha256.New)
709 case ScramSha512:
710 u.SaltedPassword = pbkdf2.Key([]byte(u.Password), u.Salt, int(u.Iterations), sha512.Size, sha512.New)
711 default:
712 return nil, fmt.Errorf("user %s: unknown mechanism, unable to generate password", u.User)
713 }
714 upsert[i] = u
715 } else {
716 if len(u.Salt) == 0 || len(u.SaltedPassword) == 0 {
717 return nil, fmt.Errorf("user %s: must specify either a password or a salt and salted password", u.User)
718 }
719 }
720 }
721
722 req := kmsg.NewPtrAlterUserSCRAMCredentialsRequest()
723 for _, d := range del {
724 rd := kmsg.NewAlterUserSCRAMCredentialsRequestDeletion()
725 rd.Name = d.User
726 rd.Mechanism = int8(d.Mechanism)
727 req.Deletions = append(req.Deletions, rd)
728 }
729 for _, u := range upsert {
730 ru := kmsg.NewAlterUserSCRAMCredentialsRequestUpsertion()
731 ru.Name = u.User
732 ru.Mechanism = int8(u.Mechanism)
733 ru.Iterations = u.Iterations
734 ru.Salt = u.Salt
735 ru.SaltedPassword = u.SaltedPassword
736 req.Upsertions = append(req.Upsertions, ru)
737 }
738 resp, err := req.RequestWith(ctx, cl.cl)
739 if err != nil {
740 return nil, err
741 }
742 rs := make(AlteredUserSCRAMs)
743 for _, res := range resp.Results {
744 if err := maybeAuthErr(res.ErrorCode); err != nil {
745 return nil, err
746 }
747 r := AlteredUserSCRAM{
748 User: res.User,
749 Err: kerr.ErrorForCode(res.ErrorCode),
750 ErrMessage: unptrStr(res.ErrorMessage),
751 }
752 rs[r.User] = r
753 }
754 return rs, nil
755 }
756
757
758 type ElectLeadersHow int8
759
760 const (
761
762 ElectPreferredReplica ElectLeadersHow = 0
763
764
765 ElectLiveReplica ElectLeadersHow = 1
766 )
767
768
769
770 type ElectLeadersResult struct {
771 Topic string
772 Partition int32
773 How ElectLeadersHow
774 Err error
775 ErrMessage string
776 }
777
778
779
780 type ElectLeadersResults map[string]map[int32]ElectLeadersResult
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795 func (cl *Client) ElectLeaders(ctx context.Context, how ElectLeadersHow, s TopicsSet) (ElectLeadersResults, error) {
796 req := kmsg.NewPtrElectLeadersRequest()
797 req.ElectionType = int8(how)
798 for _, t := range s.IntoList() {
799 rt := kmsg.NewElectLeadersRequestTopic()
800 rt.Topic = t.Topic
801 rt.Partitions = t.Partitions
802 req.Topics = append(req.Topics, rt)
803 }
804 resp, err := req.RequestWith(ctx, cl.cl)
805 if err != nil {
806 return nil, err
807 }
808 if err := maybeAuthErr(resp.ErrorCode); err != nil {
809 return nil, err
810 }
811 if resp.Version == 0 {
812 how = ElectPreferredReplica
813 }
814 rs := make(ElectLeadersResults)
815 for _, t := range resp.Topics {
816 rt := make(map[int32]ElectLeadersResult)
817 rs[t.Topic] = rt
818 for _, p := range t.Partitions {
819 if err := maybeAuthErr(p.ErrorCode); err != nil {
820 return nil, err
821 }
822 rt[p.Partition] = ElectLeadersResult{
823 Topic: t.Topic,
824 Partition: p.Partition,
825 How: how,
826 Err: kerr.ErrorForCode(p.ErrorCode),
827 ErrMessage: unptrStr(p.ErrorMessage),
828 }
829 }
830 }
831 return rs, nil
832 }
833
834
835
836 type OffsetForLeaderEpochRequest map[string]map[int32]int32
837
838
839 func (l *OffsetForLeaderEpochRequest) Add(topic string, partition, leaderEpoch int32) {
840 if *l == nil {
841 *l = make(map[string]map[int32]int32)
842 }
843 t := (*l)[topic]
844 if t == nil {
845 t = make(map[int32]int32)
846 (*l)[topic] = t
847 }
848 t[partition] = leaderEpoch
849 }
850
851
852
853 type OffsetForLeaderEpoch struct {
854 NodeID int32
855 Topic string
856 Partition int32
857
858
859
860
861
862
863
864
865
866
867
868
869 LeaderEpoch int32
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884 EndOffset int64
885
886
887 Err error
888 }
889
890
891
892 type OffsetsForLeaderEpochs map[string]map[int32]OffsetForLeaderEpoch
893
894
895
896
897
898
899
900 func (cl *Client) OffetForLeaderEpoch(ctx context.Context, r OffsetForLeaderEpochRequest) (OffsetsForLeaderEpochs, error) {
901 req := kmsg.NewPtrOffsetForLeaderEpochRequest()
902 for t, ps := range r {
903 rt := kmsg.NewOffsetForLeaderEpochRequestTopic()
904 rt.Topic = t
905 for p, e := range ps {
906 rp := kmsg.NewOffsetForLeaderEpochRequestTopicPartition()
907 rp.Partition = p
908 rp.LeaderEpoch = e
909 rt.Partitions = append(rt.Partitions, rp)
910 }
911 req.Topics = append(req.Topics, rt)
912 }
913 shards := cl.cl.RequestSharded(ctx, req)
914 ls := make(OffsetsForLeaderEpochs)
915 return ls, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
916 resp := kr.(*kmsg.OffsetForLeaderEpochResponse)
917 for _, rt := range resp.Topics {
918 lps, exists := ls[rt.Topic]
919 if !exists {
920 lps = make(map[int32]OffsetForLeaderEpoch)
921 ls[rt.Topic] = lps
922 }
923 for _, rp := range rt.Partitions {
924 if err := maybeAuthErr(rp.ErrorCode); err != nil {
925 return err
926 }
927 lps[rp.Partition] = OffsetForLeaderEpoch{
928 NodeID: b.NodeID,
929 Topic: rt.Topic,
930 Partition: rp.Partition,
931 LeaderEpoch: rp.LeaderEpoch,
932 EndOffset: rp.EndOffset,
933 Err: kerr.ErrorForCode(rp.ErrorCode),
934 }
935 }
936 }
937 return nil
938 })
939 }
940
View as plain text