1 // Package kadm provides a helper Kafka admin client around a *kgo.Client. 2 // 3 // This package is meant to cover the common use cases for dropping into an 4 // "admin" like interface for Kafka. As with any admin client, this package 5 // must make opinionated decisions on what to provide and what to hide. The 6 // underlying Kafka protocol gives more detailed information in responses, or 7 // allows more fine tuning in requests, but most of the time, these details are 8 // unnecessary. 9 // 10 // By virtue of making opinionated decisions, this package cannot satisfy every 11 // need for requests and responses. If you need more control than this admin 12 // client provides, you can use the kmsg package directly. 13 // 14 // This package contains a lot of types, but the main two types type to know 15 // are Client and ShardErrors. Every other type is used for inputs or outputs 16 // to methods on the client. 17 // 18 // The Client type is a simple small wrapper around a *kgo.Client that exists 19 // solely to namespace methods. The ShardErrors type is a bit more complicated. 20 // When issuing requests, under the hood some of these requests actually need 21 // to be mapped to brokers and split, issuing different pieces of the input 22 // request to different brokers. The *kgo.Client handles this all internally, 23 // but (if using RequestSharded as directed), returns each response to each of 24 // these split requests individually. Each response can fail or be successful. 25 // This package goes one step further and merges these failures into one meta 26 // failure, ShardErrors. Any function that returns ShardErrors is documented as 27 // such, and if a function returns a non-nil ShardErrors, it is possible that 28 // the returned data is actually valid and usable. If you care to, you can log 29 // / react to the partial failures and continue using the partial successful 30 // result. This is in contrast to other clients, which either require to to 31 // request individual brokers directly, or they completely hide individual 32 // failures, or they completely fail on any individual failure. 33 // 34 // For methods that list or describe things, this package often completely 35 // fails responses on auth failures. If you use a method that accepts two 36 // topics, one that you are authorized to and one that you are not, you will 37 // not receive a partial successful response. Instead, you will receive an 38 // AuthError. Methods that do *not* fail on auth errors are explicitly 39 // documented as such. 40 // 41 // Users may often find it easy to work with lists of topics or partitions. 42 // Rather than needing to build deeply nested maps directly, this package has a 43 // few helper types that are worth knowing: 44 // 45 // TopicsList - a slice of topics and their partitions 46 // TopicsSet - a set of topics, each containing a set of partitions 47 // Partitions - a slice of partitions 48 // OffsetsList - a slice of offsets 49 // Offsets - a map of offsets 50 // 51 // These types are meant to be easy to build and use, and can be used as the 52 // starting point for other types. 53 // 54 // Many functions in this package are variadic and return either a map or a 55 // list of responses, and you may only use one element as input and are only 56 // interested in one element of output. This package provides the following 57 // functions to help: 58 // 59 // Any(map) 60 // AnyE(map, err) 61 // First(slice) 62 // FirstE(slice, err) 63 // 64 // The intended use case of these is something like `kadm.AnyE(kadm.CreateTopics(..., "my-one-topic"))`, 65 // such that you can immediately get the response for the one topic you are 66 // creating. 67 package kadm 68 69 import ( 70 "errors" 71 "regexp" 72 "runtime/debug" 73 "sort" 74 "sync" 75 76 "github.com/twmb/franz-go/pkg/kgo" 77 ) 78 79 func unptrStr(s *string) string { 80 if s == nil { 81 return "" 82 } 83 return *s 84 } 85 86 var ( 87 reVersion *regexp.Regexp 88 reVersionOnce sync.Once 89 ) 90 91 // Copied from kgo, but we use the kadm package version. 92 func softwareVersion() string { 93 info, ok := debug.ReadBuildInfo() 94 if ok { 95 reVersionOnce.Do(func() { reVersion = regexp.MustCompile(`^[a-zA-Z0-9](?:[a-zA-Z0-9.-]*[a-zA-Z0-9])?$`) }) 96 for _, dep := range info.Deps { 97 if dep.Path == "github.com/twmb/franz-go/pkg/kadm" { 98 if reVersion.MatchString(dep.Version) { 99 return dep.Version 100 } 101 } 102 } 103 } 104 return "unknown" 105 } 106 107 // Client is an admin client. 108 // 109 // This is a simple wrapper around a *kgo.Client to provide helper admin methods. 110 type Client struct { 111 cl *kgo.Client 112 113 timeoutMillis int32 114 } 115 116 // NewClient returns an admin client. 117 func NewClient(cl *kgo.Client) *Client { 118 return &Client{cl, 15000} // 15s timeout default, matching kmsg 119 } 120 121 // NewOptClient returns a new client directly from kgo options. This is a 122 // wrapper around creating a new *kgo.Client and then creating an admin client. 123 func NewOptClient(opts ...kgo.Opt) (*Client, error) { 124 cl, err := kgo.NewClient(opts...) 125 if err != nil { 126 return nil, err 127 } 128 return NewClient(cl), nil 129 } 130 131 // Close closes the underlying *kgo.Client. 132 func (cl *Client) Close() { 133 cl.cl.Close() 134 } 135 136 // SetTimeoutMillis sets the timeout to use for requests that have a timeout, 137 // overriding the default of 15,000 (15s). 138 // 139 // Not all requests have timeouts. Most requests are expected to return 140 // immediately or are expected to deliberately hang. The following requests 141 // have timeout fields: 142 // 143 // Produce 144 // CreateTopics 145 // DeleteTopics 146 // DeleteRecords 147 // CreatePartitions 148 // ElectLeaders 149 // AlterPartitionAssignments 150 // ListPartitionReassignments 151 // UpdateFeatures 152 // 153 // Not all requests above are supported in the admin API. 154 func (cl *Client) SetTimeoutMillis(millis int32) { 155 cl.timeoutMillis = millis 156 } 157 158 // StringPtr is a shortcut function to aid building configs for creating or 159 // altering topics. 160 func StringPtr(s string) *string { 161 return &s 162 } 163 164 // BrokerDetail is a type alias for kgo.BrokerMetadata. 165 type BrokerDetail = kgo.BrokerMetadata 166 167 // BrokerDetails contains the details for many brokers. 168 type BrokerDetails []BrokerDetail 169 170 // NodeIDs returns the IDs of all nodes. 171 func (ds BrokerDetails) NodeIDs() []int32 { 172 var all []int32 173 for _, d := range ds { 174 all = append(all, d.NodeID) 175 } 176 return int32s(all) 177 } 178 179 // Partition is a partition for a topic. 180 type Partition struct { 181 Topic string // Topic is the topic for this partition. 182 Partition int32 // Partition is this partition's number. 183 } 184 185 // Offset is an offset for a topic. 186 type Offset struct { 187 Topic string 188 Partition int32 189 At int64 // Offset is the partition to set. 190 LeaderEpoch int32 // LeaderEpoch is the broker leader epoch of the record at this offset. 191 Metadata string // Metadata, if non-empty, is used for offset commits. 192 } 193 194 // Partitions wraps many partitions. 195 type Partitions []Partition 196 197 // TopicsSet returns these partitions as TopicsSet. 198 func (ps Partitions) TopicsSet() TopicsSet { 199 s := make(TopicsSet) 200 for _, p := range ps { 201 s.Add(p.Topic, p.Partition) 202 } 203 return s 204 } 205 206 // TopicsList returns these partitions as sorted TopicsList. 207 func (ps Partitions) TopicsList() TopicsList { 208 return ps.TopicsSet().Sorted() 209 } 210 211 // OffsetsList wraps many offsets and is a helper for building Offsets. 212 type OffsetsList []Offset 213 214 // Offsets returns this list as the non-list Offsets. All fields in each 215 // Offset must be set properly. 216 func (l OffsetsList) Offsets() Offsets { 217 os := make(Offsets) 218 for _, o := range l { 219 os.Add(o) 220 } 221 return os 222 } 223 224 // KOffsets returns this list as a kgo offset map. 225 func (l OffsetsList) KOffsets() map[string]map[int32]kgo.Offset { 226 return l.Offsets().KOffsets() 227 } 228 229 // Offsets wraps many offsets and is the type used for offset functions. 230 type Offsets map[string]map[int32]Offset 231 232 // Lookup returns the offset at t and p and whether it exists. 233 func (os Offsets) Lookup(t string, p int32) (Offset, bool) { 234 if len(os) == 0 { 235 return Offset{}, false 236 } 237 ps := os[t] 238 if len(ps) == 0 { 239 return Offset{}, false 240 } 241 o, exists := ps[p] 242 return o, exists 243 } 244 245 // Add adds an offset for a given topic/partition to this Offsets map. 246 // 247 // If the partition already exists, the offset is only added if: 248 // 249 // - the new leader epoch is higher than the old, or 250 // - the leader epochs equal, and the new offset is higher than the old 251 // 252 // If you would like to add offsets forcefully no matter what, use the Delete 253 // method before this. 254 func (os *Offsets) Add(o Offset) { 255 if *os == nil { 256 *os = make(map[string]map[int32]Offset) 257 } 258 ot := (*os)[o.Topic] 259 if ot == nil { 260 ot = make(map[int32]Offset) 261 (*os)[o.Topic] = ot 262 } 263 264 prior, exists := ot[o.Partition] 265 if !exists || prior.LeaderEpoch < o.LeaderEpoch || 266 prior.LeaderEpoch == o.LeaderEpoch && prior.At < o.At { 267 ot[o.Partition] = o 268 } 269 } 270 271 // Delete removes any offset at topic t and partition p. 272 func (os Offsets) Delete(t string, p int32) { 273 if os == nil { 274 return 275 } 276 ot := os[t] 277 if ot == nil { 278 return 279 } 280 delete(ot, p) 281 if len(ot) == 0 { 282 delete(os, t) 283 } 284 } 285 286 // AddOffset is a helper to add an offset for a given topic and partition. The 287 // leader epoch field must be -1 if you do not know the leader epoch or if 288 // you do not have an offset yet. 289 func (os *Offsets) AddOffset(t string, p int32, o int64, leaderEpoch int32) { 290 os.Add(Offset{ 291 Topic: t, 292 Partition: p, 293 At: o, 294 LeaderEpoch: leaderEpoch, 295 }) 296 } 297 298 // KeepFunc calls fn for every offset, keeping the offset if fn returns true. 299 func (os Offsets) KeepFunc(fn func(o Offset) bool) { 300 for t, ps := range os { 301 for p, o := range ps { 302 if !fn(o) { 303 delete(ps, p) 304 } 305 } 306 if len(ps) == 0 { 307 delete(os, t) 308 } 309 } 310 } 311 312 // DeleteFunc calls fn for every offset, deleting the offset if fn returns 313 // true. 314 func (os Offsets) DeleteFunc(fn func(o Offset) bool) { 315 os.KeepFunc(func(o Offset) bool { return !fn(o) }) 316 } 317 318 // Topics returns the set of topics and partitions currently used in these 319 // offsets. 320 func (os Offsets) TopicsSet() TopicsSet { 321 s := make(TopicsSet) 322 os.Each(func(o Offset) { s.Add(o.Topic, o.Partition) }) 323 return s 324 } 325 326 // Each calls fn for each offset in these offsets. 327 func (os Offsets) Each(fn func(Offset)) { 328 for _, ps := range os { 329 for _, o := range ps { 330 fn(o) 331 } 332 } 333 } 334 335 // KOffsets returns these offsets as a kgo offset map. 336 func (os Offsets) KOffsets() map[string]map[int32]kgo.Offset { 337 tskgo := make(map[string]map[int32]kgo.Offset) 338 for t, ps := range os { 339 pskgo := make(map[int32]kgo.Offset) 340 for p, o := range ps { 341 pskgo[p] = kgo.NewOffset(). 342 At(o.At). 343 WithEpoch(o.LeaderEpoch) 344 } 345 tskgo[t] = pskgo 346 } 347 return tskgo 348 } 349 350 // Sorted returns the offsets sorted by topic and partition. 351 func (os Offsets) Sorted() []Offset { 352 var s []Offset 353 os.Each(func(o Offset) { s = append(s, o) }) 354 sort.Slice(s, func(i, j int) bool { 355 return s[i].Topic < s[j].Topic || 356 s[i].Topic == s[j].Topic && s[i].Partition < s[j].Partition 357 }) 358 return s 359 } 360 361 // OffsetsFromFetches returns Offsets for the final record in any partition in 362 // the fetches. This is a helper to enable committing an entire returned batch. 363 // 364 // This function looks at only the last record per partition, assuming that the 365 // last record is the highest offset (which is the behavior returned by kgo's 366 // Poll functions). The returned offsets are one past the offset contained in 367 // the records. 368 func OffsetsFromFetches(fs kgo.Fetches) Offsets { 369 os := make(Offsets) 370 fs.EachPartition(func(p kgo.FetchTopicPartition) { 371 if len(p.Records) == 0 { 372 return 373 } 374 r := p.Records[len(p.Records)-1] 375 os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) 376 }) 377 return os 378 } 379 380 // OffsetsFromRecords returns offsets for all given records, using the highest 381 // offset per partition. The returned offsets are one past the offset contained 382 // in the records. 383 func OffsetsFromRecords(rs ...kgo.Record) Offsets { 384 os := make(Offsets) 385 for _, r := range rs { 386 os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) 387 } 388 return os 389 } 390 391 // TopicsSet is a set of topics and, per topic, a set of partitions. 392 // 393 // All methods provided for TopicsSet are safe to use on a nil (default) set. 394 type TopicsSet map[string]map[int32]struct{} 395 396 // Lookup returns whether the topic and partition exists. 397 func (s TopicsSet) Lookup(t string, p int32) bool { 398 if len(s) == 0 { 399 return false 400 } 401 ps := s[t] 402 if len(ps) == 0 { 403 return false 404 } 405 _, exists := ps[p] 406 return exists 407 } 408 409 // Each calls fn for each topic / partition in the topics set. 410 func (s TopicsSet) Each(fn func(t string, p int32)) { 411 for t, ps := range s { 412 for p := range ps { 413 fn(t, p) 414 } 415 } 416 } 417 418 // EachPartitions calls fn for each topic and its partitions in the topics set. 419 func (s TopicsSet) EachPartitions(fn func(t string, ps []int32)) { 420 for t, ps := range s { 421 sliced := make([]int32, 0, len(ps)) 422 for p := range ps { 423 sliced = append(sliced, p) 424 } 425 fn(t, sliced) 426 } 427 } 428 429 // EmptyTopics returns all topics with no partitions. 430 func (s TopicsSet) EmptyTopics() []string { 431 var e []string 432 for t, ps := range s { 433 if len(ps) == 0 { 434 e = append(e, t) 435 } 436 } 437 return e 438 } 439 440 // Add adds partitions for a topic to the topics set. If no partitions are 441 // added, this still creates the topic. 442 func (s *TopicsSet) Add(t string, ps ...int32) { 443 if *s == nil { 444 *s = make(map[string]map[int32]struct{}) 445 } 446 existing := (*s)[t] 447 if existing == nil { 448 existing = make(map[int32]struct{}, len(ps)) 449 (*s)[t] = existing 450 } 451 for _, p := range ps { 452 existing[p] = struct{}{} 453 } 454 } 455 456 // Delete removes partitions from a topic from the topics set. If the topic 457 // ends up with no partitions, the topic is removed from the set. 458 func (s TopicsSet) Delete(t string, ps ...int32) { 459 if s == nil || len(ps) == 0 { 460 return 461 } 462 existing := s[t] 463 if existing == nil { 464 return 465 } 466 for _, p := range ps { 467 delete(existing, p) 468 } 469 if len(existing) == 0 { 470 delete(s, t) 471 } 472 } 473 474 // Topics returns all topics in this set in sorted order. 475 func (s TopicsSet) Topics() []string { 476 ts := make([]string, 0, len(s)) 477 for t := range s { 478 ts = append(ts, t) 479 } 480 sort.Strings(ts) 481 return ts 482 } 483 484 // Merge merges another topic set into this one. 485 func (s TopicsSet) Merge(other TopicsSet) { 486 for t, ps := range other { 487 for p := range ps { 488 s.Add(t, p) 489 } 490 } 491 } 492 493 // IntoList returns this set as a list. 494 func (s TopicsSet) IntoList() TopicsList { 495 l := make(TopicsList, 0, len(s)) 496 for t, ps := range s { 497 lps := make([]int32, 0, len(ps)) 498 for p := range ps { 499 lps = append(lps, p) 500 } 501 l = append(l, TopicPartitions{ 502 Topic: t, 503 Partitions: lps, 504 }) 505 } 506 return l 507 } 508 509 // Sorted returns this set as a list in topic-sorted order, with each topic 510 // having sorted partitions. 511 func (s TopicsSet) Sorted() TopicsList { 512 l := make(TopicsList, 0, len(s)) 513 for t, ps := range s { 514 tps := TopicPartitions{ 515 Topic: t, 516 Partitions: make([]int32, 0, len(ps)), 517 } 518 for p := range ps { 519 tps.Partitions = append(tps.Partitions, p) 520 } 521 tps.Partitions = int32s(tps.Partitions) 522 l = append(l, tps) 523 } 524 sort.Slice(l, func(i, j int) bool { return l[i].Topic < l[j].Topic }) 525 return l 526 } 527 528 // TopicPartitions is a topic and partitions. 529 type TopicPartitions struct { 530 Topic string 531 Partitions []int32 532 } 533 534 // TopicsList is a list of topics and partitions. 535 type TopicsList []TopicPartitions 536 537 // Each calls fn for each topic / partition in the topics list. 538 func (l TopicsList) Each(fn func(t string, p int32)) { 539 for _, t := range l { 540 for _, p := range t.Partitions { 541 fn(t.Topic, p) 542 } 543 } 544 } 545 546 // EachPartitions calls fn for each topic and its partitions in the topics 547 // list. 548 func (l TopicsList) EachPartitions(fn func(t string, ps []int32)) { 549 for _, t := range l { 550 fn(t.Topic, t.Partitions) 551 } 552 } 553 554 // EmptyTopics returns all topics with no partitions. 555 func (l TopicsList) EmptyTopics() []string { 556 var e []string 557 for _, t := range l { 558 if len(t.Partitions) == 0 { 559 e = append(e, t.Topic) 560 } 561 } 562 return e 563 } 564 565 // Topics returns all topics in this set in sorted order. 566 func (l TopicsList) Topics() []string { 567 ts := make([]string, 0, len(l)) 568 for _, t := range l { 569 ts = append(ts, t.Topic) 570 } 571 sort.Strings(ts) 572 return ts 573 } 574 575 // IntoSet returns this list as a set. 576 func (l TopicsList) IntoSet() TopicsSet { 577 s := make(TopicsSet) 578 for _, t := range l { 579 s.Add(t.Topic, t.Partitions...) 580 } 581 return s 582 } 583 584 // First returns the first element of the input slice and whether it exists. 585 // This is the non-error-accepting equivalent of FirstE. 586 // 587 // Many client methods in kadm accept a variadic amount of input arguments and 588 // return either a slice or a map of responses, but you often use the method 589 // with only one argument. This function can help extract the one response you 590 // are interested in. 591 func First[S ~[]T, T any](s S) (T, bool) { 592 if len(s) == 0 { 593 var t T 594 return t, false 595 } 596 return s[0], true 597 } 598 599 // Any returns the first range element of the input map and whether it exists. 600 // This is the non-error-accepting equivalent of AnyE. 601 // 602 // Many client methods in kadm accept a variadic amount of input arguments and 603 // return either a slice or a map of responses, but you often use the method 604 // with only one argument. This function can help extract the one response you 605 // are interested in. 606 func Any[M ~map[K]V, K comparable, V any](m M) (V, bool) { 607 for _, v := range m { 608 return v, true 609 } 610 var v V 611 return v, false 612 } 613 614 // ErrEmpty is returned from FirstE or AnyE if the input is empty. 615 var ErrEmpty = errors.New("empty") 616 617 // FirstE returns the first element of the input slice, or the input error 618 // if it is non-nil. If the error is nil but the slice is empty, this returns 619 // ErrEmpty. This is the error-accepting equivalent of First. 620 // 621 // Many client methods in kadm accept a variadic amount of input arguments and 622 // return either a slice or a map of responses, but you often use the method 623 // with only one argument. This function can help extract the one response you 624 // are interested in. 625 func FirstE[S ~[]T, T any](s S, err error) (T, error) { 626 if err != nil { 627 var t T 628 return t, err 629 } 630 if len(s) == 0 { 631 var t T 632 return t, ErrEmpty 633 } 634 return s[0], err 635 } 636 637 // AnyE returns the first range element of the input map, or the input error if 638 // it is non-nil. If the error is nil but the map is empty, this returns 639 // ErrEmpty. This is the error-accepting equivalent of Any. 640 // 641 // Many client methods in kadm accept a variadic amount of input arguments and 642 // return either a slice or a map of responses, but you often use the method 643 // with only one argument. This function can help extract the one response you 644 // are interested in. 645 func AnyE[M ~map[K]V, K comparable, V any](m M, err error) (V, error) { 646 if err != nil { 647 var v V 648 return v, err 649 } 650 for _, v := range m { 651 return v, nil 652 } 653 var v V 654 return v, ErrEmpty 655 } 656