1 // Package kversion specifies versions for Kafka request keys. 2 // 3 // Kafka technically has internal broker versions that bump multiple times per 4 // release. This package only defines releases and tip. 5 package kversion 6 7 import ( 8 "bytes" 9 "fmt" 10 "text/tabwriter" 11 12 "github.com/twmb/franz-go/pkg/kmsg" 13 ) 14 15 // Versions is a list of versions, with each item corresponding to a Kafka key 16 // and each item's value corresponding to the max version supported. 17 // 18 // Minimum versions are not currently tracked because all keys have a minimum 19 // version of zero. The internals of a Versions may change in the future to 20 // support minimum versions; the outward facing API of Versions should not 21 // change to support this. 22 // 23 // As well, supported features may be added in the future. 24 type Versions struct { 25 // If any version is -1, then it is left out in that version. 26 // This was first done in version 2.7.0, where Kafka added support 27 // for 52, 53, 54, 55, but it was not a part of the 2.7.0 release, 28 // so ApiVersionsResponse goes from 51 to 56. 29 k2v []int16 30 } 31 32 // FromApiVersionsResponse returns a Versions from a kmsg.ApiVersionsResponse. 33 func FromApiVersionsResponse(r *kmsg.ApiVersionsResponse) *Versions { 34 var v Versions 35 for _, key := range r.ApiKeys { 36 v.SetMaxKeyVersion(key.ApiKey, key.MaxVersion) 37 } 38 return &v 39 } 40 41 // HasKey returns true if the versions contains the given key. 42 func (vs *Versions) HasKey(k int16) bool { 43 _, has := vs.LookupMaxKeyVersion(k) 44 return has 45 } 46 47 // LookupMaxKeyVersion returns the version for the given key and whether the 48 // key exists. If the key does not exist, this returns (-1, false). 49 func (vs *Versions) LookupMaxKeyVersion(k int16) (int16, bool) { 50 if k < 0 { 51 return -1, false 52 } 53 if int(k) >= len(vs.k2v) { 54 return -1, false 55 } 56 version := vs.k2v[k] 57 if version < 0 { 58 return -1, false 59 } 60 return version, true 61 } 62 63 // SetMaxKeyVersion sets the max version for the given key. 64 // 65 // Setting a version to -1 unsets the key. 66 // 67 // Versions are backed by a slice; if the slice is not long enough, it is 68 // extended to fit the key. 69 func (vs *Versions) SetMaxKeyVersion(k, v int16) { 70 if v < 0 { 71 v = -1 72 } 73 // If the version is < 0, we are unsetting a version. If we are 74 // unsetting a version that is more than the amount of keys we already 75 // have, we have no reason to unset. 76 if k < 0 || v < 0 && int(k) >= len(vs.k2v)+1 { 77 return 78 } 79 needLen := int(k) + 1 80 for len(vs.k2v) < needLen { 81 vs.k2v = append(vs.k2v, -1) 82 } 83 vs.k2v[k] = v 84 } 85 86 // Equal returns whether two versions are equal. 87 func (vs *Versions) Equal(other *Versions) bool { 88 // We allow the version slices to be of different lengths, so long as 89 // the versions for keys in one and not the other are -1. 90 // 91 // Basically, all non-negative-one keys must be equal. 92 long, short := vs.k2v, other.k2v 93 if len(short) > len(long) { 94 long, short = short, long 95 } 96 for i, v := range short { 97 if v != long[i] { 98 return false 99 } 100 } 101 for _, v := range long[len(short):] { 102 if v >= 0 { 103 return false 104 } 105 } 106 return true 107 } 108 109 // EachMaxKeyVersion calls fn for each key and max version 110 func (vs *Versions) EachMaxKeyVersion(fn func(k, v int16)) { 111 for k, v := range vs.k2v { 112 if v >= 0 { 113 fn(int16(k), v) 114 } 115 } 116 } 117 118 // VersionGuessOpt is an option to change how version guessing is done. 119 type VersionGuessOpt interface { 120 apply(*guessCfg) 121 } 122 123 type guessOpt struct{ fn func(*guessCfg) } 124 125 func (opt guessOpt) apply(cfg *guessCfg) { opt.fn(cfg) } 126 127 // SkipKeys skips the given keys while guessing versions. 128 // 129 // The current default is to skip keys that are only used by brokers: 130 // 131 // 4: LeaderAndISR 132 // 5: StopReplica 133 // 6: UpdateMetadata 134 // 7: ControlledShutdown 135 // 27: WriteTxnMarkers 136 func SkipKeys(keys ...int16) VersionGuessOpt { 137 return guessOpt{func(cfg *guessCfg) { cfg.skipKeys = keys }} 138 } 139 140 // TryRaftBroker changes from guessing the version for a classical ZooKeeper 141 // based broker to guessing for a raft based broker (v2.8+). 142 // 143 // Note that with raft, there can be a TryRaftController attempt as well. 144 func TryRaftBroker() VersionGuessOpt { 145 return guessOpt{func(cfg *guessCfg) { cfg.listener = rBroker }} 146 } 147 148 // TryRaftController changes from guessing the version for a classical 149 // ZooKeeper based broker to guessing for a raft based controller broker 150 // (v2.8+). 151 // 152 // Note that with raft, there can be a TryRaftBroker attempt as well. Odds are 153 // that if you are an end user speaking to a raft based Kafka cluster, you are 154 // speaking to a raft broker. The controller is specifically for broker to 155 // broker communication. 156 func TryRaftController() VersionGuessOpt { 157 return guessOpt{func(cfg *guessCfg) { cfg.listener = rController }} 158 } 159 160 type guessCfg struct { 161 skipKeys []int16 162 listener listener 163 } 164 165 // VersionGuess attempts to guess which version of Kafka these versions belong 166 // to. If an exact match can be determined, this returns a string in the format 167 // v0.#.# or v#.# (depending on whether Kafka is pre-1.0 or post). For 168 // example, v0.8.0 or v2.7. 169 // 170 // Patch numbers are not included in the guess as it is not possible to 171 // determine the Kafka patch version being used as a client. 172 // 173 // If the version is determined to be higher than kversion knows of or is tip, 174 // this package returns "at least v#.#". 175 // 176 // Custom versions, or in-between versions, are detected and return slightly 177 // more verbose strings. 178 // 179 // Options can be specified to change how version guessing is performed, for 180 // example, certain keys can be skipped, or the guessing can try evaluating the 181 // versions as Raft broker based versions. 182 // 183 // Internally, this function tries guessing the version against both KRaft and 184 // Kafka APIs. The more exact match is returned. 185 func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string { 186 standard := vs.versionGuess(opts...) 187 raftBroker := vs.versionGuess(append(opts, TryRaftBroker())...) 188 raftController := vs.versionGuess(append(opts, TryRaftController())...) 189 190 // If any of these are exact, return the exact guess. 191 for _, g := range []guess{ 192 standard, 193 raftBroker, 194 raftController, 195 } { 196 if g.how == guessExact { 197 return g.String() 198 } 199 } 200 201 // If any are atLeast, that means it is newer than we can guess and we 202 // return the highest version. 203 for _, g := range []guess{ 204 standard, 205 raftBroker, 206 raftController, 207 } { 208 if g.how == guessAtLeast { 209 return g.String() 210 } 211 } 212 213 // This is a custom version. We could do some advanced logic to try to 214 // return highest of all three guesses, but that may be inaccurate: 215 // KRaft may detect a higher guess because not all requests exist in 216 // KRaft. Instead, we just return our standard guess. 217 return standard.String() 218 } 219 220 type guess struct { 221 v1 string 222 v2 string // for between 223 how int8 224 } 225 226 const ( 227 guessExact = iota 228 guessAtLeast 229 guessCustomUnknown 230 guessCustomAtLeast 231 guessBetween 232 guessNotEven 233 ) 234 235 func (g guess) String() string { 236 switch g.how { 237 case guessExact: 238 return g.v1 239 case guessAtLeast: 240 return "at least " + g.v1 241 case guessCustomUnknown: 242 return "unknown custom version" 243 case guessCustomAtLeast: 244 return "unknown custom version at least " + g.v1 245 case guessBetween: 246 return "between " + g.v1 + " and " + g.v2 247 case guessNotEven: 248 return "not even " + g.v1 249 } 250 return g.v1 251 } 252 253 func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { 254 cfg := guessCfg{ 255 listener: zkBroker, 256 // Envelope was added in 2.7 for kraft and zkBroker in 3.4; we 257 // need to skip it for 2.7 through 3.4 otherwise the version 258 // detection fails. We can just skip it generally since there 259 // are enough differentiating factors that accurately detecting 260 // envelope doesn't matter. 261 // 262 // TODO: add introduced-version to differentiate some specific 263 // keys. 264 skipKeys: []int16{4, 5, 6, 7, 27, 58}, 265 } 266 for _, opt := range opts { 267 opt.apply(&cfg) 268 } 269 270 skip := make(map[int16]bool, len(cfg.skipKeys)) 271 for _, k := range cfg.skipKeys { 272 skip[k] = true 273 } 274 275 var last string 276 cmp := make(map[int16]int16, len(maxTip)) 277 cmpskip := make(map[int16]int16) 278 for _, comparison := range []struct { 279 cmp listenerKeys 280 name string 281 }{ 282 {max080, "v0.8.0"}, 283 {max081, "v0.8.1"}, 284 {max082, "v0.8.2"}, 285 {max090, "v0.9.0"}, 286 {max0100, "v0.10.0"}, 287 {max0101, "v0.10.1"}, 288 {max0102, "v0.10.2"}, 289 {max0110, "v0.11.0"}, 290 {max100, "v1.0"}, 291 {max110, "v1.1"}, 292 {max200, "v2.0"}, 293 {max210, "v2.1"}, 294 {max220, "v2.2"}, 295 {max230, "v2.3"}, 296 {max240, "v2.4"}, 297 {max250, "v2.5"}, 298 {max260, "v2.6"}, 299 {max270, "v2.7"}, 300 {max280, "v2.8"}, 301 {max300, "v3.0"}, 302 {max310, "v3.1"}, 303 {max320, "v3.2"}, 304 {max330, "v3.3"}, 305 {max340, "v3.4"}, 306 {max350, "v3.5"}, 307 {max360, "v3.6"}, 308 } { 309 for k, v := range comparison.cmp.filter(cfg.listener) { 310 if v == -1 { 311 continue 312 } 313 k16 := int16(k) 314 if skip[k16] { 315 cmpskip[k16] = v 316 } else { 317 cmp[k16] = v 318 } 319 } 320 321 var under, equal, over bool 322 323 for k, v := range vs.k2v { 324 k16 := int16(k) 325 if skip[k16] { 326 skipv, ok := cmpskip[k16] 327 if v == -1 || !ok { 328 continue 329 } 330 cmp[k16] = skipv 331 } 332 cmpv, has := cmp[k16] 333 if has { 334 // If our version for this key is less than the 335 // comparison versions, then we are less than what we 336 // are comparing. 337 if v < cmpv { 338 under = true 339 } else if v > cmpv { 340 // Similarly, if our version is more, then we 341 // are over what we are comparing. 342 over = true 343 } else { 344 equal = true 345 } 346 delete(cmp, k16) 347 } else if v >= 0 { 348 // If what we are comparing to does not even have this 349 // key **and** our version is larger non-zero, then our 350 // version is larger than what we are comparing to. 351 // 352 // We can have a negative version if a key was manually 353 // unset. 354 over = true 355 } 356 // If the version is < 0, the key is unset. 357 } 358 359 // If our versions did not clear out what we are comparing against, we 360 // do not have all keys that we need for this version. 361 if len(cmp) > 0 { 362 under = true 363 } 364 365 current := comparison.name 366 switch { 367 case under && over: 368 // Regardless of equal being true or not, this is a custom version. 369 if last != "" { 370 return guess{v1: last, how: guessCustomAtLeast} 371 } 372 return guess{v1: last, how: guessCustomUnknown} 373 374 case under: 375 // Regardless of equal being true or not, we have not yet hit 376 // this version. 377 if last != "" { 378 return guess{v1: last, v2: current, how: guessBetween} 379 } 380 return guess{v1: current, how: guessNotEven} 381 382 case over: 383 // Regardless of equal being true or not, we try again. 384 last = current 385 386 case equal: 387 return guess{v1: current, how: guessExact} 388 } 389 // At least one of under, equal, or over must be true, so there 390 // is no default case. 391 } 392 393 return guess{v1: last, how: guessAtLeast} 394 } 395 396 // String returns a string representation of the versions; the format may 397 // change. 398 func (vs *Versions) String() string { 399 var buf bytes.Buffer 400 w := tabwriter.NewWriter(&buf, 0, 0, 2, ' ', 0) 401 for k, v := range vs.k2v { 402 if v < 0 { 403 continue 404 } 405 name := kmsg.NameForKey(int16(k)) 406 if name == "" { 407 name = "Unknown" 408 } 409 fmt.Fprintf(w, "%s\t%d\n", name, v) 410 } 411 w.Flush() 412 return buf.String() 413 } 414 415 // Stable is a shortcut for the latest _released_ Kafka versions. 416 // 417 // This is the default version used in kgo to avoid breaking tip changes. 418 func Stable() *Versions { return zkBrokerOf(maxStable) } 419 420 // Tip is the latest defined Kafka key versions; this may be slightly out of date. 421 func Tip() *Versions { return zkBrokerOf(maxTip) } 422 423 func V0_8_0() *Versions { return zkBrokerOf(max080) } 424 func V0_8_1() *Versions { return zkBrokerOf(max081) } 425 func V0_8_2() *Versions { return zkBrokerOf(max082) } 426 func V0_9_0() *Versions { return zkBrokerOf(max090) } 427 func V0_10_0() *Versions { return zkBrokerOf(max0100) } 428 func V0_10_1() *Versions { return zkBrokerOf(max0101) } 429 func V0_10_2() *Versions { return zkBrokerOf(max0102) } 430 func V0_11_0() *Versions { return zkBrokerOf(max0110) } 431 func V1_0_0() *Versions { return zkBrokerOf(max100) } 432 func V1_1_0() *Versions { return zkBrokerOf(max110) } 433 func V2_0_0() *Versions { return zkBrokerOf(max200) } 434 func V2_1_0() *Versions { return zkBrokerOf(max210) } 435 func V2_2_0() *Versions { return zkBrokerOf(max220) } 436 func V2_3_0() *Versions { return zkBrokerOf(max230) } 437 func V2_4_0() *Versions { return zkBrokerOf(max240) } 438 func V2_5_0() *Versions { return zkBrokerOf(max250) } 439 func V2_6_0() *Versions { return zkBrokerOf(max260) } 440 func V2_7_0() *Versions { return zkBrokerOf(max270) } 441 func V2_8_0() *Versions { return zkBrokerOf(max280) } 442 func V3_0_0() *Versions { return zkBrokerOf(max300) } 443 func V3_1_0() *Versions { return zkBrokerOf(max310) } 444 func V3_2_0() *Versions { return zkBrokerOf(max320) } 445 func V3_3_0() *Versions { return zkBrokerOf(max330) } 446 func V3_4_0() *Versions { return zkBrokerOf(max340) } 447 func V3_5_0() *Versions { return zkBrokerOf(max350) } 448 449 /* TODO wait for franz-go v1.16 450 func V3_6_0() *Versions { return zkBrokerOf(max360) } 451 */ 452 453 func zkBrokerOf(lks listenerKeys) *Versions { 454 return &Versions{lks.filter(zkBroker)} 455 } 456 457 type listener uint8 458 459 func (l listener) has(target listener) bool { 460 return l&target != 0 461 } 462 463 const ( 464 zkBroker listener = 1 << iota 465 rBroker 466 rController 467 ) 468 469 type listenerKey struct { 470 listener listener 471 version int16 472 } 473 474 type listenerKeys []listenerKey 475 476 func (lks listenerKeys) filter(listener listener) []int16 { 477 r := make([]int16, 0, len(lks)) 478 for _, lk := range lks { 479 if lk.listener.has(listener) { 480 r = append(r, lk.version) 481 } else { 482 r = append(r, -1) 483 } 484 } 485 return r 486 } 487 488 // All requests before KRaft started being introduced support the zkBroker, but 489 // KRaft changed that. Kafka commit 698319b8e2c1f6cb574f339eede6f2a5b1919b55 490 // added which listeners support which API keys. 491 func k(listeners ...listener) listenerKey { 492 var k listenerKey 493 for _, listener := range listeners { 494 k.listener |= listener 495 } 496 return k 497 } 498 499 func (l *listenerKey) inc() { 500 l.version++ 501 } 502 503 // For the comments below, appends are annotated with the key being introduced, 504 // while incs are annotated with the version the inc results in. 505 506 func nextMax(prev listenerKeys, do func(listenerKeys) listenerKeys) listenerKeys { 507 return do(append(listenerKeys(nil), prev...)) 508 } 509 510 var max080 = nextMax(nil, func(listenerKeys) listenerKeys { 511 return listenerKeys{ 512 k(zkBroker, rBroker), // 0 produce 513 k(zkBroker, rBroker, rController), // 1 fetch 514 k(zkBroker, rBroker), // 2 list offset 515 k(zkBroker, rBroker), // 3 metadata 516 k(zkBroker), // 4 leader and isr 517 k(zkBroker), // 5 stop replica 518 k(zkBroker), // 6 update metadata, actually not supported for a bit 519 k(zkBroker, rController), // 7 controlled shutdown, actually not supported for a bit 520 } 521 }) 522 523 var max081 = nextMax(max080, func(v listenerKeys) listenerKeys { 524 return append(v, 525 k(zkBroker, rBroker), // 8 offset commit KAFKA-965 db37ed0054 526 k(zkBroker, rBroker), // 9 offset fetch (same) 527 ) 528 }) 529 530 var max082 = nextMax(max081, func(v listenerKeys) listenerKeys { 531 v[8].inc() // 1 offset commit KAFKA-1462 532 v[9].inc() // 1 offset fetch KAFKA-1841 161b1aa16e I think? 533 return append(v, 534 k(zkBroker, rBroker), // 10 find coordinator KAFKA-1012 a670537aa3 535 k(zkBroker, rBroker), // 11 join group (same) 536 k(zkBroker, rBroker), // 12 heartbeat (same) 537 ) 538 }) 539 540 var max090 = nextMax(max082, func(v listenerKeys) listenerKeys { 541 v[0].inc() // 1 produce KAFKA-2136 436b7ddc38; KAFKA-2083 ?? KIP-13 542 v[1].inc() // 1 fetch (same) 543 v[6].inc() // 1 update metadata KAFKA-2411 d02ca36ca1 544 v[7].inc() // 1 controlled shutdown (same) 545 v[8].inc() // 2 offset commit KAFKA-1634 546 return append(v, 547 k(zkBroker, rBroker), // 13 leave group KAFKA-2397 636e14a991 548 k(zkBroker, rBroker), // 14 sync group KAFKA-2464 86eb74d923 549 k(zkBroker, rBroker), // 15 describe groups KAFKA-2687 596c203af1 550 k(zkBroker, rBroker), // 16 list groups KAFKA-2687 596c203af1 551 ) 552 }) 553 554 var max0100 = nextMax(max090, func(v listenerKeys) listenerKeys { 555 v[0].inc() // 2 produce KAFKA-3025 45c8195fa1 KIP-31 KIP-32 556 v[1].inc() // 2 fetch (same) 557 v[3].inc() // 1 metadata KAFKA-3306 33d745e2dc 558 v[6].inc() // 2 update metadata KAFKA-1215 951e30adc6 559 return append(v, 560 k(zkBroker, rBroker, rController), // 17 sasl handshake KAFKA-3149 5b375d7bf9 561 k(zkBroker, rBroker, rController), // 18 api versions KAFKA-3307 8407dac6ee 562 ) 563 }) 564 565 var max0101 = nextMax(max0100, func(v listenerKeys) listenerKeys { 566 v[1].inc() // 3 fetch KAFKA-2063 d04b0998c0 KIP-74 567 v[2].inc() // 1 list offset KAFKA-4148 eaaa433fc9 KIP-79 568 v[3].inc() // 2 metadata KAFKA-4093 ecc1fb10fa KIP-78 569 v[11].inc() // 1 join group KAFKA-3888 40b1dd3f49 KIP-62 570 return append(v, 571 k(zkBroker, rBroker, rController), // 19 create topics KAFKA-2945 fc47b9fa6b 572 k(zkBroker, rBroker, rController), // 20 delete topics KAFKA-2946 539633ba0e 573 ) 574 }) 575 576 var max0102 = nextMax(max0101, func(v listenerKeys) listenerKeys { 577 v[6].inc() // 3 update metadata KAFKA-4565 d25671884b KIP-103 578 v[19].inc() // 1 create topics KAFKA-4591 da57bc27e7 KIP-108 579 return v 580 }) 581 582 var max0110 = nextMax(max0102, func(v listenerKeys) listenerKeys { 583 v[0].inc() // 3 produce KAFKA-4816 5bd06f1d54 KIP-98 584 v[1].inc() // 4 fetch (same) 585 v[1].inc() // 5 fetch KAFKA-4586 8b05ad406d KIP-107 586 v[3].inc() // 4 metadata KAFKA-5291 7311dcbc53 (3 below) 587 v[9].inc() // 2 offset fetch KAFKA-3853 c2d9b95f36 KIP-98 588 v[10].inc() // 1 find coordinator KAFKA-5043 d0e7c6b930 KIP-98 589 v = append(v, 590 k(zkBroker, rBroker), // 21 delete records KAFKA-4586 see above 591 k(zkBroker, rBroker), // 22 init producer id KAFKA-4817 bdf4cba047 KIP-98 (raft added in KAFKA-12620 e97cff2702b6ba836c7925caa36ab18066a7c95d KIP-730) 592 k(zkBroker, rBroker), // 23 offset for leader epoch KAFKA-1211 0baea2ac13 KIP-101 593 594 k(zkBroker, rBroker), // 24 add partitions to txn KAFKA-4990 865d82af2c KIP-98 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324) 595 k(zkBroker, rBroker), // 25 add offsets to txn (same, same raft) 596 k(zkBroker, rBroker), // 26 end txn (same, same raft) 597 k(zkBroker, rBroker), // 27 write txn markers (same) 598 k(zkBroker, rBroker), // 28 txn offset commit (same, same raft) 599 600 // raft broker / controller added in 5b0c58ed53c420e93957369516f34346580dac95 601 k(zkBroker, rBroker, rController), // 29 describe acls KAFKA-3266 9815e18fef KIP-140 602 k(zkBroker, rBroker, rController), // 30 create acls (same) 603 k(zkBroker, rBroker, rController), // 31 delete acls (same) 604 605 k(zkBroker, rBroker), // 32 describe configs KAFKA-3267 972b754536 KIP-133 606 k(zkBroker, rBroker, rController), // 33 alter configs (same) (raft broker 3.0 6e857c531f14d07d5b05f174e6063a124c917324, controller 273d66479dbee2398b09e478ffaf996498d1ab34) 607 ) 608 609 // KAFKA-4954 0104b657a1 KIP-124 610 v[2].inc() // 2 list offset (reused in e71dce89c0 KIP-98) 611 v[3].inc() // 3 metadata 612 v[8].inc() // 3 offset commit 613 v[9].inc() // 3 offset fetch 614 v[11].inc() // 2 join group 615 v[12].inc() // 1 heartbeat 616 v[13].inc() // 1 leave group 617 v[14].inc() // 1 sync group 618 v[15].inc() // 1 describe groups 619 v[16].inc() // 1 list group 620 v[18].inc() // 1 api versions 621 v[19].inc() // 2 create topics 622 v[20].inc() // 1 delete topics 623 624 return v 625 }) 626 627 var max100 = nextMax(max0110, func(v listenerKeys) listenerKeys { 628 v[0].inc() // 4 produce KAFKA-4763 fc93fb4b61 KIP-112 629 v[1].inc() // 6 fetch (same) 630 v[3].inc() // 5 metadata (same) 631 v[4].inc() // 1 leader and isr (same) 632 v[6].inc() // 4 update metadata (same) 633 634 v[0].inc() // 5 produce KAFKA-5793 94692288be 635 v[17].inc() // 1 sasl handshake KAFKA-4764 8fca432223 KIP-152 636 637 return append(v, 638 k(zkBroker, rBroker), // 34 alter replica log dirs KAFKA-5694 adefc8ea07 KIP-113 639 k(zkBroker, rBroker), // 35 describe log dirs (same) 640 k(zkBroker, rBroker, rController), // 36 sasl authenticate KAFKA-4764 (see above) 641 k(zkBroker, rBroker, rController), // 37 create partitions KAFKA-5856 5f6393f9b1 KIP-195 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324) 642 ) 643 }) 644 645 var max110 = nextMax(max100, func(v listenerKeys) listenerKeys { 646 v = append(v, 647 k(zkBroker), // 38 create delegation token KAFKA-4541 27a8d0f9e7 under KAFKA-1696 KIP-48 648 k(zkBroker), // 39 renew delegation token (same) 649 k(zkBroker), // 40 expire delegation token (same) 650 k(zkBroker), // 41 describe delegation token (same) 651 k(zkBroker, rBroker), // 42 delete groups KAFKA-6275 1ed6da7cc8 KIP-229 652 ) 653 654 v[1].inc() // 7 fetch KAFKA-6254 7fe1c2b3d3 KIP-227 655 v[32].inc() // 1 describe configs KAFKA-6241 b814a16b96 KIP-226 656 657 return v 658 }) 659 660 var max200 = nextMax(max110, func(v listenerKeys) listenerKeys { 661 v[0].inc() // 6 produce KAFKA-6028 1facab387f KIP-219 662 v[1].inc() // 8 fetch (same) 663 v[2].inc() // 3 list offset (same) 664 v[3].inc() // 6 metadata (same) 665 v[8].inc() // 4 offset commit (same) 666 v[9].inc() // 4 offset fetch (same) 667 v[10].inc() // 2 find coordinator (same) 668 v[11].inc() // 3 join group (same) 669 v[12].inc() // 2 heartbeat (same) 670 v[13].inc() // 2 leave group (same) 671 v[14].inc() // 2 sync group (same) 672 v[15].inc() // 2 describe groups (same) 673 v[16].inc() // 2 list group (same) 674 v[18].inc() // 2 api versions (same) 675 v[19].inc() // 3 create topics (same) 676 v[20].inc() // 2 delete topics (same) 677 v[21].inc() // 1 delete records (same) 678 v[22].inc() // 1 init producer id (same) 679 v[24].inc() // 1 add partitions to txn (same) 680 v[25].inc() // 1 add offsets to txn (same) 681 v[26].inc() // 1 end txn (same) 682 v[28].inc() // 1 txn offset commit (same) 683 // 29, 30, 31 bumped below, but also had throttle changes 684 v[32].inc() // 2 describe configs (same) 685 v[33].inc() // 1 alter configs (same) 686 v[34].inc() // 1 alter replica log dirs (same) 687 v[35].inc() // 1 describe log dirs (same) 688 v[37].inc() // 1 create partitions (same) 689 v[38].inc() // 1 create delegation token (same) 690 v[39].inc() // 1 renew delegation token (same) 691 v[40].inc() // 1 expire delegation token (same) 692 v[41].inc() // 1 describe delegation token (same) 693 v[42].inc() // 1 delete groups (same) 694 695 v[29].inc() // 1 describe acls KAFKA-6841 b3aa655a70 KIP-290 696 v[30].inc() // 1 create acls (same) 697 v[31].inc() // 1 delete acls (same) 698 699 v[23].inc() // 1 offset for leader epoch KAFKA-6361 9679c44d2b KIP-279 700 return v 701 }) 702 703 var max210 = nextMax(max200, func(v listenerKeys) listenerKeys { 704 v[8].inc() // 5 offset commit KAFKA-4682 418a91b5d4 KIP-211 705 706 v[20].inc() // 3 delete topics KAFKA-5975 04770916a7 KIP-322 707 708 v[1].inc() // 9 fetch KAFKA-7333 05ba5aa008 KIP-320 709 v[2].inc() // 4 list offset (same) 710 v[3].inc() // 7 metadata (same) 711 v[8].inc() // 6 offset commit (same) 712 v[9].inc() // 5 offset fetch (same) 713 v[23].inc() // 2 offset for leader epoch (same, also in Kafka PR #5635 79ad9026a6) 714 v[28].inc() // 2 txn offset commit (same) 715 716 v[0].inc() // 7 produce KAFKA-4514 741cb761c5 KIP-110 717 v[1].inc() // 10 fetch (same) 718 return v 719 }) 720 721 var max220 = nextMax(max210, func(v listenerKeys) listenerKeys { 722 v[2].inc() // 5 list offset KAFKA-2334 152292994e KIP-207 723 v[11].inc() // 4 join group KAFKA-7824 9a9310d074 KIP-394 724 v[36].inc() // 1 sasl authenticate KAFKA-7352 e8a3bc7425 KIP-368 725 726 v[4].inc() // 2 leader and isr KAFKA-7235 2155c6d54b KIP-380 727 v[5].inc() // 1 stop replica (same) 728 v[6].inc() // 5 update metadata (same) 729 v[7].inc() // 2 controlled shutdown (same) 730 731 return append(v, 732 k(zkBroker, rBroker, rController), // 43 elect preferred leaders KAFKA-5692 269b65279c KIP-183 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324) 733 ) 734 }) 735 736 var max230 = nextMax(max220, func(v listenerKeys) listenerKeys { 737 v[3].inc() // 8 metadata KAFKA-7922 a42f16f980 KIP-430 738 v[15].inc() // 3 describe groups KAFKA-7922 f11fa5ef40 KIP-430 739 740 v[1].inc() // 11 fetch KAFKA-8365 e2847e8603 KIP-392 741 v[23].inc() // 3 offset for leader epoch (same) 742 743 v[11].inc() // 5 join group KAFKA-7862 0f995ba6be KIP-345 744 v[8].inc() // 7 offset commit KAFKA-8225 9fa331b811 KIP-345 745 v[12].inc() // 3 heartbeat (same) 746 v[14].inc() // 3 sync group (same) 747 748 return append(v, 749 k(zkBroker, rBroker, rController), // 44 incremental alter configs KAFKA-7466 3b1524c5df KIP-339 750 ) 751 }) 752 753 var max240 = nextMax(max230, func(v listenerKeys) listenerKeys { 754 v[4].inc() // 3 leader and isr KAFKA-8345 81900d0ba0 KIP-455 755 v[15].inc() // 4 describe groups KAFKA-8538 f8db022b08 KIP-345 756 v[19].inc() // 4 create topics KAFKA-8305 8e161580b8 KIP-464 757 v[43].inc() // 1 elect preferred leaders KAFKA-8286 121308cc7a KIP-460 758 v = append(v, 759 // raft added in e07de97a4ce730a2755db7eeacb9b3e1f69a12c8 for the following two 760 k(zkBroker, rBroker, rController), // 45 alter partition reassignments KAFKA-8345 81900d0ba0 KIP-455 761 k(zkBroker, rBroker, rController), // 46 list partition reassignments (same) 762 763 k(zkBroker, rBroker), // 47 offset delete KAFKA-8730 e24d0e22ab KIP-496 764 ) 765 766 v[13].inc() // 3 leave group KAFKA-8221 74c90f46c3 KIP-345 767 768 // introducing flexible versions; 24 were bumped 769 v[3].inc() // 9 metadata KAFKA-8885 apache/kafka#7325 KIP-482 770 v[4].inc() // 4 leader and isr (same) 771 v[5].inc() // 2 stop replica (same) 772 v[6].inc() // 6 update metadata (same) 773 v[7].inc() // 3 controlled shutdown (same) 774 v[8].inc() // 8 offset commit (same) 775 v[9].inc() // 6 offset fetch (same) 776 v[10].inc() // 3 find coordinator (same) 777 v[11].inc() // 6 join group (same) 778 v[12].inc() // 4 heartbeat (same) 779 v[13].inc() // 4 leave group (same) 780 v[14].inc() // 4 sync group (same) 781 v[15].inc() // 5 describe groups (same) 782 v[16].inc() // 3 list group (same) 783 v[18].inc() // 3 api versions (same, also KIP-511 [non-flexible fields added]) 784 v[19].inc() // 5 create topics (same) 785 v[20].inc() // 4 delete topics (same) 786 v[22].inc() // 2 init producer id (same) 787 v[38].inc() // 2 create delegation token (same) 788 v[42].inc() // 2 delete groups (same) 789 v[43].inc() // 2 elect preferred leaders (same) 790 v[44].inc() // 1 incremental alter configs (same) 791 // also 45, 46; not bumped since in same release 792 793 // Create topics (19) was bumped up to 5 in KAFKA-8907 5d0052fe00 794 // KIP-525, then 6 in the above bump, then back down to 5 once the 795 // tagged PR was merged (KAFKA-8932 1f1179ea64 for the bump down). 796 797 v[0].inc() // 8 produce KAFKA-8729 f6f24c4700 KIP-467 798 799 return v 800 }) 801 802 var max250 = nextMax(max240, func(v listenerKeys) listenerKeys { 803 v[22].inc() // 3 init producer id KAFKA-8710 fecb977b25 KIP-360 804 v[9].inc() // 7 offset fetch KAFKA-9346 6da70f9b95 KIP-447 805 806 // more flexible versions, KAFKA-9420 0a2569e2b99 KIP-482 807 // 6 bumped, then sasl handshake reverted later in 1a8dcffe4 808 v[36].inc() // 2 sasl authenticate 809 v[37].inc() // 2 create partitions 810 v[39].inc() // 2 renew delegation token 811 v[40].inc() // 2 expire delegation token 812 v[41].inc() // 2 describe delegation token 813 814 v[28].inc() // 3 txn offset commit KAFKA-9365 ed7c071e07f KIP-447 815 816 v[29].inc() // 2 describe acls KAFKA-9026 40b35178e5 KIP-482 (for flexible versions) 817 v[30].inc() // 2 create acls KAFKA-9027 738e14edb KIP-482 (flexible) 818 v[31].inc() // 2 delete acls KAFKA-9028 738e14edb KIP-482 (flexible) 819 820 v[11].inc() // 7 join group KAFKA-9437 96c4ce480 KIP-559 821 v[14].inc() // 5 sync group (same) 822 823 return v 824 }) 825 826 var max260 = nextMax(max250, func(v listenerKeys) listenerKeys { 827 v[21].inc() // 2 delete records KAFKA-8768 f869e33ab KIP-482 (opportunistic bump for flexible versions) 828 v[35].inc() // 2 describe log dirs KAFKA-9435 4f1e8331ff9 KIP-482 (same) 829 830 v = append(v, 831 k(zkBroker, rBroker), // 48 describe client quotas KAFKA-7740 227a7322b KIP-546 (raft in 5964401bf9aab611bd4a072941bd1c927e044258) 832 k(zkBroker, rBroker, rController), // 49 alter client quotas (same) 833 ) 834 835 v[5].inc() // 3 stop replica KAFKA-9539 7c7d55dbd KIP-570 836 837 v[16].inc() // 4 list group KAFKA-9130 fe948d39e KIP-518 838 v[32].inc() // 3 describe configs KAFKA-9494 af3b8b50f2 KIP-569 839 840 return v 841 }) 842 843 var max270 = nextMax(max260, func(v listenerKeys) listenerKeys { 844 // KAFKA-10163 a5ffd1ca44c KIP-599 845 v[37].inc() // 3 create partitions 846 v[19].inc() // 6 create topics (same) 847 v[20].inc() // 5 delete topics (same) 848 849 // KAFKA-9911 b937ec7567 KIP-588 850 v[22].inc() // 4 init producer id 851 v[24].inc() // 2 add partitions to txn 852 v[25].inc() // 2 add offsets to txn 853 v[26].inc() // 2 end txn 854 855 v = append(v, 856 k(zkBroker, rBroker, rController), // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554; 38c409cf33c kraft 857 k(zkBroker, rBroker, rController), // 51 alter user scram creds, same 858 ) 859 860 // KAFKA-10435 634c9175054cc69d10b6da22ea1e95edff6a4747 KIP-595 861 // This opted in fetch request to flexible versions. 862 // 863 // KAFKA-10487: further change in aa5263fba903c85812c0c31443f7d49ee371e9db 864 v[1].inc() // 12 fetch 865 866 // KAFKA-10492 b7c8490cf47b0c18253d6a776b2b35c76c71c65d KIP-595 867 // 868 // These are the first requests that are raft only. 869 v = append(v, 870 k(rController), // 52 vote 871 k(rController), // 53 begin quorum epoch 872 k(rController), // 54 end quorum epoch 873 k(rBroker, rController), // 55 describe quorum 874 ) 875 876 // KAFKA-8836 57de67db22eb373f92ec5dd449d317ed2bc8b8d1 KIP-497 877 v = append(v, 878 k(zkBroker, rController), // 56 alter isr 879 ) 880 881 // KAFKA-10028 fb4f297207ef62f71e4a6d2d0dac75752933043d KIP-584 882 return append(v, 883 k(zkBroker, rBroker, rController), // 57 update features (rbroker 3.0 6e857c531f14d07d5b05f174e6063a124c917324; rcontroller 3.2 55ff5d360381af370fe5b3a215831beac49571a4 KIP-778 KAFKA-13823) 884 ) 885 }) 886 887 var max280 = nextMax(max270, func(v listenerKeys) listenerKeys { 888 // KAFKA-10181 KAFKA-10181 KIP-590 889 v = append(v, 890 k(zkBroker, rController), // 58 envelope, controller first, zk in KAFKA-14446 8b045dcbf6b89e1a9594ff95642d4882765e4b0d KIP-866 Kafka 3.4 891 ) 892 893 // KAFKA-10729 85f94d50271c952c3e9ee49c4fc814c0da411618 KIP-482 894 // (flexible bumps) 895 v[0].inc() // 9 produce 896 v[2].inc() // 6 list offsets 897 v[23].inc() // 4 offset for leader epoch 898 v[24].inc() // 3 add partitions to txn 899 v[25].inc() // 3 add offsets to txn 900 v[26].inc() // 3 end txn 901 v[27].inc() // 1 write txn markers 902 v[32].inc() // 4 describe configs 903 v[33].inc() // 2 alter configs 904 v[34].inc() // 2 alter replica log dirs 905 v[48].inc() // 1 describe client quotas 906 v[49].inc() // 1 alter client quotas 907 908 // KAFKA-10547 5c921afa4a593478f7d1c49e5db9d787558d0d5e KIP-516 909 v[3].inc() // 10 metadata 910 v[6].inc() // 7 update metadata 911 912 // KAFKA-10545 1dd1e7f945d7a8c1dc177223cd88800680f1ff46 KIP-516 913 v[4].inc() // 5 leader and isr 914 915 // KAFKA-10427 2023aed59d863278a6302e03066d387f994f085c KIP-630 916 v = append(v, 917 k(rController), // 59 fetch snapshot 918 ) 919 920 // KAFKA-12204 / KAFKA-10851 302eee63c479fd4b955c44f1058a5e5d111acb57 KIP-700 921 v = append(v, 922 k(zkBroker, rBroker), // 60 describe cluster 923 ) 924 925 // KAFKA-12212 7a1d1d9a69a241efd68e572badee999229b3942f KIP-700 926 v[3].inc() // 11 metadata 927 928 // KAFKA-10764 4f588f7ca2a1c5e8dd845863da81425ac69bac92 KIP-516 929 v[19].inc() // 7 create topics 930 v[20].inc() // 6 delete topics 931 932 // KAFKA-12238 e9edf104866822d9e6c3b637ffbf338767b5bf27 KIP-664 933 v = append(v, 934 k(zkBroker, rBroker), // 61 describe producers 935 ) 936 937 // KAFKA-12248 a022072df3c8175950c03263d2bbf2e3ea7a7a5d KIP-500 938 // (commit mentions KIP-500, these are actually described in KIP-631) 939 // Broker registration was later updated in d9bb2ef596343da9402bff4903b129cff1f7c22b 940 v = append(v, 941 k(rController), // 62 broker registration 942 k(rController), // 63 broker heartbeat 943 ) 944 945 // KAFKA-12249 3f36f9a7ca153a9d221f6bedeb7d1503aa18eff1 KIP-500 / KIP-631 946 // Renamed from Decommission to Unregister in 06dce721ec0185d49fac37775dbf191d0e80e687 947 v = append(v, 948 // kraft broker added in 7143267f71ca0c14957d8560fbc42a5f8aac564d 949 k(rBroker, rController), // 64 unregister broker 950 ) 951 return v 952 }) 953 954 var max300 = nextMax(max280, func(v listenerKeys) listenerKeys { 955 // KAFKA-12267 3f09fb97b6943c0612488dfa8e5eab8078fd7ca0 KIP-664 956 v = append(v, 957 k(zkBroker, rBroker), // 65 describe transactions 958 ) 959 960 // KAFKA-12369 3708a7c6c1ecf1304f091dda1e79ae53ba2df489 KIP-664 961 v = append(v, 962 k(zkBroker, rBroker), // 66 list transactions 963 ) 964 965 // KAFKA-12620 72d108274c98dca44514007254552481c731c958 KIP-730 966 // raft broker added in e97cff2702b6ba836c7925caa36ab18066a7c95d 967 v = append(v, 968 k(zkBroker, rController), // 67 allocate producer ids 969 ) 970 971 // KAFKA-12541 bd72ef1bf1e40feb3bc17349a385b479fa5fa530 KIP-734 972 v[2].inc() // 7 list offsets 973 974 // KAFKA-12663 f5d5f654db359af077088685e29fbe5ea69616cf KIP-699 975 v[10].inc() // 4 find coordinator 976 977 // KAFKA-12234 e00c0f3719ad0803620752159ef8315d668735d6 KIP-709 978 v[9].inc() // 8 offset fetch 979 980 return v 981 }) 982 983 var max310 = nextMax(max300, func(v listenerKeys) listenerKeys { 984 // KAFKA-10580 2b8aff58b575c199ee8372e5689420c9d77357a5 KIP-516 985 v[1].inc() // 13 fetch 986 987 // KAFKA-10744 1d22b0d70686aef5689b775ea2ea7610a37f3e8c KIP-516 988 v[3].inc() // 12 metadata 989 990 return v 991 }) 992 993 var max320 = nextMax(max310, func(v listenerKeys) listenerKeys { 994 // KAFKA-13495 69645f1fe5103adb00de6fa43152e7df989f3aea KIP-800 995 v[11].inc() // 8 join group 996 997 // KAFKA-13496 bf609694f83931990ce63e0123f811e6475820c5 KIP-800 998 v[13].inc() // 5 leave group 999 1000 // KAFKA-13527 31fca1611a6780e8a8aa3ac21618135201718e32 KIP-784 1001 v[35].inc() // 3 describe log dirs 1002 1003 // KAFKA-13435 c8fbe26f3bd3a7c018e7619deba002ee454208b9 KIP-814 1004 v[11].inc() // 9 join group 1005 1006 // KAFKA-13587 52621613fd386203773ba93903abd50b46fa093a KIP-704 1007 v[4].inc() // 6 leader and isr 1008 v[56].inc() // 1 alter isr => alter partition 1009 1010 return v 1011 }) 1012 1013 var max330 = nextMax(max320, func(v listenerKeys) listenerKeys { 1014 // KAFKA-13823 55ff5d360381af370fe5b3a215831beac49571a4 KIP-778 1015 v[57].inc() // 1 update features 1016 1017 // KAFKA-13958 4fcfd9ddc4a8da3d4cfbb69268c06763352e29a9 KIP-827 1018 v[35].inc() // 4 describe log dirs 1019 1020 // KAFKA-841 f83d95d9a28 KIP-841 1021 v[56].inc() // 2 alter partition 1022 1023 // KAFKA-13888 a126e3a622f KIP-836 1024 v[55].inc() // 1 describe quorum 1025 1026 // KAFKA-6945 d65d8867983 KIP-373 1027 v[29].inc() // 3 describe acls 1028 v[30].inc() // 3 create acls 1029 v[31].inc() // 3 delete acls 1030 v[38].inc() // 3 create delegation token 1031 v[41].inc() // 3 describe delegation token 1032 1033 return v 1034 }) 1035 1036 var max340 = nextMax(max330, func(v listenerKeys) listenerKeys { 1037 // KAFKA-14304 7b7e40a536a79cebf35cc278b9375c8352d342b9 KIP-866 1038 // KAFKA-14448 67c72596afe58363eceeb32084c5c04637a33831 added BrokerRegistration 1039 // KAFKA-14493 db490707606855c265bc938e1b236070e0e2eba5 changed BrokerRegistration 1040 // KAFKA-14304 0bb05d8679b684ad8fbb2eb40dfc00066186a75a changed BrokerRegistration back to a bool... 1041 // 5b521031edea8ea7cbcca7dc24a58429423740ff added tag to ApiVersions 1042 v[4].inc() // 7 leader and isr 1043 v[5].inc() // 4 stop replica 1044 v[6].inc() // 8 update metadata 1045 v[62].inc() // 1 broker registration 1046 return v 1047 }) 1048 1049 var max350 = nextMax(max340, func(v listenerKeys) listenerKeys { 1050 // KAFKA-13369 7146ac57ba9ddd035dac992b9f188a8e7677c08d KIP-405 1051 v[1].inc() // 14 fetch 1052 v[2].inc() // 8 list offsets 1053 1054 v[1].inc() // 15 fetch // KAFKA-14617 79b5f7f1ce2 KIP-903 1055 v[56].inc() // 3 alter partition // KAFKA-14617 8c88cdb7186b1d594f991eb324356dcfcabdf18a KIP-903 1056 return v 1057 }) 1058 1059 var max360 = nextMax(max350, func(v listenerKeys) listenerKeys { 1060 // KAFKA-14402 29a1a16668d76a1cc04ec9e39ea13026f2dce1de KIP-890 1061 // Later commit swapped to stable 1062 v[24].inc() // 4 add partitions to txn 1063 return v 1064 }) 1065 1066 var ( 1067 maxStable = max360 1068 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { 1069 return v 1070 }) 1071 ) 1072