1 package kadm
2
3 import (
4 "context"
5 "sort"
6
7 "github.com/twmb/franz-go/pkg/kerr"
8 "github.com/twmb/franz-go/pkg/kmsg"
9 )
10
11
12
13
14 type AlterReplicaLogDirsReq map[string]TopicsSet
15
16
17 func (r *AlterReplicaLogDirsReq) Add(d string, s TopicsSet) {
18 if *r == nil {
19 *r = make(map[string]TopicsSet)
20 }
21 existing := (*r)[d]
22 if existing == nil {
23 existing = make(TopicsSet)
24 (*r)[d] = existing
25 }
26 existing.Merge(s)
27 }
28
29 func (r AlterReplicaLogDirsReq) req() *kmsg.AlterReplicaLogDirsRequest {
30 req := kmsg.NewPtrAlterReplicaLogDirsRequest()
31 for dir, ts := range r {
32 rd := kmsg.NewAlterReplicaLogDirsRequestDir()
33 rd.Dir = dir
34 for t, ps := range ts {
35 rt := kmsg.NewAlterReplicaLogDirsRequestDirTopic()
36 rt.Topic = t
37 for p := range ps {
38 rt.Partitions = append(rt.Partitions, p)
39 }
40 rd.Topics = append(rd.Topics, rt)
41 }
42 req.Dirs = append(req.Dirs, rd)
43 }
44 return req
45 }
46
47 func (r AlterReplicaLogDirsReq) dirfor(t string, p int32) string {
48 for d, dts := range r {
49 if dts == nil {
50 continue
51 }
52 dtps, ok := dts[t]
53 if !ok {
54 continue
55 }
56 if _, ok = dtps[p]; !ok {
57 continue
58 }
59 return d
60 }
61 return ""
62 }
63
64
65
66 type AlterAllReplicaLogDirsResponses map[int32]AlterReplicaLogDirsResponses
67
68
69 func (rs AlterAllReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse {
70 var all []AlterReplicaLogDirsResponse
71 rs.Each(func(r AlterReplicaLogDirsResponse) {
72 all = append(all, r)
73 })
74 sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
75 return all
76 }
77
78
79 func (rs AlterAllReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse)) {
80 for _, ts := range rs {
81 ts.Each(fn)
82 }
83 }
84
85
86
87 type AlterReplicaLogDirsResponses map[string]map[int32]AlterReplicaLogDirsResponse
88
89
90 func (rs AlterReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse {
91 var all []AlterReplicaLogDirsResponse
92 rs.Each(func(r AlterReplicaLogDirsResponse) {
93 all = append(all, r)
94 })
95 sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
96 return all
97 }
98
99
100 func (rs AlterReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse)) {
101 for _, ps := range rs {
102 for _, r := range ps {
103 fn(r)
104 }
105 }
106 }
107
108
109
110 type AlterReplicaLogDirsResponse struct {
111 Broker int32
112 Dir string
113 Topic string
114 Partition int32
115 Err error
116 }
117
118
119
120 func (a AlterReplicaLogDirsResponse) Less(other AlterReplicaLogDirsResponse) bool {
121 if a.Broker < other.Broker {
122 return true
123 }
124 if a.Broker > other.Broker {
125 return false
126 }
127 if a.Dir < other.Dir {
128 return true
129 }
130 if a.Dir > other.Dir {
131 return false
132 }
133 if a.Topic < other.Topic {
134 return true
135 }
136 if a.Topic > other.Topic {
137 return false
138 }
139 return a.Partition < other.Partition
140 }
141
142 func newAlterLogDirsResp(node int32, req AlterReplicaLogDirsReq, resp *kmsg.AlterReplicaLogDirsResponse) AlterReplicaLogDirsResponses {
143 a := make(AlterReplicaLogDirsResponses)
144 for _, kt := range resp.Topics {
145 ps := make(map[int32]AlterReplicaLogDirsResponse)
146 a[kt.Topic] = ps
147 for _, kp := range kt.Partitions {
148 ps[kp.Partition] = AlterReplicaLogDirsResponse{
149 Broker: node,
150 Dir: req.dirfor(kt.Topic, kp.Partition),
151 Topic: kt.Topic,
152 Partition: kp.Partition,
153 Err: kerr.ErrorForCode(kp.ErrorCode),
154 }
155 }
156 }
157 return a
158 }
159
160
161
162
163
164
165 func (cl *Client) AlterAllReplicaLogDirs(ctx context.Context, alter AlterReplicaLogDirsReq) (AlterAllReplicaLogDirsResponses, error) {
166 if len(alter) == 0 {
167 return make(AlterAllReplicaLogDirsResponses), nil
168 }
169 req := alter.req()
170 shards := cl.cl.RequestSharded(ctx, req)
171 resps := make(AlterAllReplicaLogDirsResponses)
172 return resps, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
173 resp := kr.(*kmsg.AlterReplicaLogDirsResponse)
174 resps[b.NodeID] = newAlterLogDirsResp(b.NodeID, alter, resp)
175 return nil
176 })
177 }
178
179
180
181 func (cl *Client) AlterBrokerReplicaLogDirs(ctx context.Context, broker int32, alter AlterReplicaLogDirsReq) (AlterReplicaLogDirsResponses, error) {
182 if len(alter) == 0 {
183 return make(AlterReplicaLogDirsResponses), nil
184 }
185 b := cl.cl.Broker(int(broker))
186 kresp, err := b.RetriableRequest(ctx, alter.req())
187 if err != nil {
188 return nil, err
189 }
190 resp := kresp.(*kmsg.AlterReplicaLogDirsResponse)
191 return newAlterLogDirsResp(broker, alter, resp), nil
192 }
193
194 func describeLogDirsReq(s TopicsSet) *kmsg.DescribeLogDirsRequest {
195 req := kmsg.NewPtrDescribeLogDirsRequest()
196 for t, ps := range s {
197 rt := kmsg.NewDescribeLogDirsRequestTopic()
198 rt.Topic = t
199 for p := range ps {
200 rt.Partitions = append(rt.Partitions, p)
201 }
202 req.Topics = append(req.Topics, rt)
203 }
204 return req
205 }
206
207
208
209 type DescribedAllLogDirs map[int32]DescribedLogDirs
210
211
212 func (ds DescribedAllLogDirs) Sorted() []DescribedLogDir {
213 var all []DescribedLogDir
214 ds.Each(func(d DescribedLogDir) {
215 all = append(all, d)
216 })
217 sort.Slice(all, func(i, j int) bool {
218 l, r := all[i], all[j]
219 return l.Broker < r.Broker || l.Broker == r.Broker && l.Dir < r.Dir
220 })
221 return all
222 }
223
224
225 func (ds DescribedAllLogDirs) Each(fn func(DescribedLogDir)) {
226 for _, bds := range ds {
227 bds.Each(fn)
228 }
229 }
230
231
232
233 type DescribedLogDirs map[string]DescribedLogDir
234
235
236 func (ds DescribedLogDirs) Lookup(d, t string, p int32) (DescribedLogDirPartition, bool) {
237 dir, exists := ds[d]
238 if !exists {
239 return DescribedLogDirPartition{}, false
240 }
241 ps, exists := dir.Topics[t]
242 if !exists {
243 return DescribedLogDirPartition{}, false
244 }
245 dp, exists := ps[p]
246 if !exists {
247 return DescribedLogDirPartition{}, false
248 }
249 return dp, true
250 }
251
252
253
254
255 func (ds DescribedLogDirs) LookupPartition(t string, p int32) (DescribedLogDirPartition, bool) {
256 for _, dir := range ds {
257 ps, exists := dir.Topics[t]
258 if !exists {
259 continue
260 }
261 dp, exists := ps[p]
262 if !exists {
263 continue
264 }
265 return dp, true
266 }
267 return DescribedLogDirPartition{}, false
268 }
269
270
271 func (ds DescribedLogDirs) Size() int64 {
272 var tot int64
273 ds.EachPartition(func(d DescribedLogDirPartition) {
274 tot += d.Size
275 })
276 return tot
277 }
278
279
280
281
282 func (ds DescribedLogDirs) Error() error {
283 for _, d := range ds {
284 if d.Err != nil {
285 return d.Err
286 }
287 }
288 return nil
289 }
290
291
292
293 func (ds DescribedLogDirs) Ok() bool {
294 return ds.Error() == nil
295 }
296
297
298 func (ds DescribedLogDirs) Sorted() []DescribedLogDir {
299 var all []DescribedLogDir
300 ds.Each(func(d DescribedLogDir) {
301 all = append(all, d)
302 })
303 sort.Slice(all, func(i, j int) bool {
304 l, r := all[i], all[j]
305 return l.Broker < r.Broker || l.Broker == r.Broker && l.Dir < r.Dir
306 })
307 return all
308 }
309
310
311
312 func (ds DescribedLogDirs) SortedPartitions() []DescribedLogDirPartition {
313 var all []DescribedLogDirPartition
314 ds.EachPartition(func(d DescribedLogDirPartition) {
315 all = append(all, d)
316 })
317 sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
318 return all
319 }
320
321
322
323 func (ds DescribedLogDirs) SortedBySize() []DescribedLogDir {
324 var all []DescribedLogDir
325 ds.Each(func(d DescribedLogDir) {
326 all = append(all, d)
327 })
328 sort.Slice(all, func(i, j int) bool {
329 l, r := all[i], all[j]
330 ls, rs := l.Size(), r.Size()
331 return ls < rs || ls == rs &&
332 (l.Broker < r.Broker || l.Broker == r.Broker &&
333 l.Dir < r.Dir)
334 })
335 return all
336 }
337
338
339
340
341 func (ds DescribedLogDirs) SortedPartitionsBySize() []DescribedLogDirPartition {
342 var all []DescribedLogDirPartition
343 ds.EachPartition(func(d DescribedLogDirPartition) {
344 all = append(all, d)
345 })
346 sort.Slice(all, func(i, j int) bool { return all[i].LessBySize(all[j]) })
347 return all
348 }
349
350
351
352 func (ds DescribedLogDirs) SmallestPartitionBySize() (DescribedLogDirPartition, bool) {
353 sorted := ds.SortedPartitionsBySize()
354 if len(sorted) == 0 {
355 return DescribedLogDirPartition{}, false
356 }
357 return sorted[0], true
358 }
359
360
361
362 func (ds DescribedLogDirs) LargestPartitionBySize() (DescribedLogDirPartition, bool) {
363 sorted := ds.SortedPartitionsBySize()
364 if len(sorted) == 0 {
365 return DescribedLogDirPartition{}, false
366 }
367 return sorted[len(sorted)-1], true
368 }
369
370
371 func (ds DescribedLogDirs) Each(fn func(DescribedLogDir)) {
372 for _, d := range ds {
373 fn(d)
374 }
375 }
376
377
378 func (ds DescribedLogDirs) EachPartition(fn func(d DescribedLogDirPartition)) {
379 for _, d := range ds {
380 d.Topics.Each(fn)
381 }
382 }
383
384
385 func (ds DescribedLogDirs) EachError(fn func(DescribedLogDir)) {
386 for _, d := range ds {
387 if d.Err != nil {
388 fn(d)
389 }
390 }
391 }
392
393
394 type DescribedLogDir struct {
395 Broker int32
396 Dir string
397 Topics DescribedLogDirTopics
398 Err error
399 }
400
401
402
403 func (ds DescribedLogDir) Size() int64 {
404 return ds.Topics.Size()
405 }
406
407
408 type DescribedLogDirTopics map[string]map[int32]DescribedLogDirPartition
409
410
411 func (ds DescribedLogDirTopics) Lookup(t string, p int32) (DescribedLogDirPartition, bool) {
412 ps, exists := ds[t]
413 if !exists {
414 return DescribedLogDirPartition{}, false
415 }
416 d, exists := ps[p]
417 return d, exists
418 }
419
420
421 func (ds DescribedLogDirTopics) Size() int64 {
422 var tot int64
423 ds.Each(func(d DescribedLogDirPartition) {
424 tot += d.Size
425 })
426 return tot
427 }
428
429
430 func (ds DescribedLogDirTopics) Sorted() []DescribedLogDirPartition {
431 var all []DescribedLogDirPartition
432 ds.Each(func(d DescribedLogDirPartition) {
433 all = append(all, d)
434 })
435 sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
436 return all
437 }
438
439
440
441 func (ds DescribedLogDirTopics) SortedBySize() []DescribedLogDirPartition {
442 var all []DescribedLogDirPartition
443 ds.Each(func(d DescribedLogDirPartition) {
444 all = append(all, d)
445 })
446 sort.Slice(all, func(i, j int) bool { return all[i].LessBySize(all[j]) })
447 return all
448 }
449
450
451 func (ds DescribedLogDirTopics) Each(fn func(p DescribedLogDirPartition)) {
452 for _, ps := range ds {
453 for _, d := range ps {
454 fn(d)
455 }
456 }
457 }
458
459
460
461 type DescribedLogDirPartition struct {
462 Broker int32
463 Dir string
464 Topic string
465 Partition int32
466 Size int64
467
468
469
470
471
472
473
474
475
476
477 OffsetLag int64
478
479
480
481 IsFuture bool
482 }
483
484
485
486 func (p DescribedLogDirPartition) Less(other DescribedLogDirPartition) bool {
487 if p.Broker < other.Broker {
488 return true
489 }
490 if p.Broker > other.Broker {
491 return false
492 }
493 if p.Dir < other.Dir {
494 return true
495 }
496 if p.Dir > other.Dir {
497 return false
498 }
499 if p.Topic < other.Topic {
500 return true
501 }
502 if p.Topic > other.Topic {
503 return false
504 }
505 if p.Partition < other.Partition {
506 return true
507 }
508 if p.Partition > other.Partition {
509 return false
510 }
511 return p.Size < other.Size
512 }
513
514
515
516 func (p DescribedLogDirPartition) LessBySize(other DescribedLogDirPartition) bool {
517 if p.Size < other.Size {
518 return true
519 }
520 return p.Less(other)
521 }
522
523 func newDescribeLogDirsResp(node int32, resp *kmsg.DescribeLogDirsResponse) DescribedLogDirs {
524 ds := make(DescribedLogDirs)
525 for _, rd := range resp.Dirs {
526 d := DescribedLogDir{
527 Broker: node,
528 Dir: rd.Dir,
529 Topics: make(DescribedLogDirTopics),
530 Err: kerr.ErrorForCode(rd.ErrorCode),
531 }
532 for _, rt := range rd.Topics {
533 t := make(map[int32]DescribedLogDirPartition)
534 d.Topics[rt.Topic] = t
535 for _, rp := range rt.Partitions {
536 t[rp.Partition] = DescribedLogDirPartition{
537 Broker: node,
538 Dir: rd.Dir,
539 Topic: rt.Topic,
540 Partition: rp.Partition,
541 Size: rp.Size,
542 OffsetLag: rp.OffsetLag,
543 IsFuture: rp.IsFuture,
544 }
545 }
546 }
547 ds[rd.Dir] = d
548 }
549 return ds
550 }
551
552
553
554
555
556
557 func (cl *Client) DescribeAllLogDirs(ctx context.Context, s TopicsSet) (DescribedAllLogDirs, error) {
558 req := describeLogDirsReq(s)
559 shards := cl.cl.RequestSharded(ctx, req)
560 resps := make(DescribedAllLogDirs)
561 return resps, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
562 resp := kr.(*kmsg.DescribeLogDirsResponse)
563 if err := maybeAuthErr(resp.ErrorCode); err != nil {
564 return err
565 }
566 if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
567 return err
568 }
569 resps[b.NodeID] = newDescribeLogDirsResp(b.NodeID, resp)
570 return nil
571 })
572 }
573
574
575
576
577 func (cl *Client) DescribeBrokerLogDirs(ctx context.Context, broker int32, s TopicsSet) (DescribedLogDirs, error) {
578 req := describeLogDirsReq(s)
579 b := cl.cl.Broker(int(broker))
580 kresp, err := b.RetriableRequest(ctx, req)
581 if err != nil {
582 return nil, err
583 }
584 resp := kresp.(*kmsg.DescribeLogDirsResponse)
585 if err := maybeAuthErr(resp.ErrorCode); err != nil {
586 return nil, err
587 }
588 if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
589 return nil, err
590 }
591 return newDescribeLogDirsResp(broker, resp), nil
592 }
593
View as plain text