...

Source file src/github.com/twmb/franz-go/pkg/kversion/kversion.go

Documentation: github.com/twmb/franz-go/pkg/kversion

     1  // Package kversion specifies versions for Kafka request keys.
     2  //
     3  // Kafka technically has internal broker versions that bump multiple times per
     4  // release. This package only defines releases and tip.
     5  package kversion
     6  
     7  import (
     8  	"bytes"
     9  	"fmt"
    10  	"text/tabwriter"
    11  
    12  	"github.com/twmb/franz-go/pkg/kmsg"
    13  )
    14  
    15  // Versions is a list of versions, with each item corresponding to a Kafka key
    16  // and each item's value corresponding to the max version supported.
    17  //
    18  // Minimum versions are not currently tracked because all keys have a minimum
    19  // version of zero. The internals of a Versions may change in the future to
    20  // support minimum versions; the outward facing API of Versions should not
    21  // change to support this.
    22  //
    23  // As well, supported features may be added in the future.
    24  type Versions struct {
    25  	// If any version is -1, then it is left out in that version.
    26  	// This was first done in version 2.7.0, where Kafka added support
    27  	// for 52, 53, 54, 55, but it was not a part of the 2.7.0 release,
    28  	// so ApiVersionsResponse goes from 51 to 56.
    29  	k2v []int16
    30  }
    31  
    32  // FromApiVersionsResponse returns a Versions from a kmsg.ApiVersionsResponse.
    33  func FromApiVersionsResponse(r *kmsg.ApiVersionsResponse) *Versions {
    34  	var v Versions
    35  	for _, key := range r.ApiKeys {
    36  		v.SetMaxKeyVersion(key.ApiKey, key.MaxVersion)
    37  	}
    38  	return &v
    39  }
    40  
    41  // HasKey returns true if the versions contains the given key.
    42  func (vs *Versions) HasKey(k int16) bool {
    43  	_, has := vs.LookupMaxKeyVersion(k)
    44  	return has
    45  }
    46  
    47  // LookupMaxKeyVersion returns the version for the given key and whether the
    48  // key exists. If the key does not exist, this returns (-1, false).
    49  func (vs *Versions) LookupMaxKeyVersion(k int16) (int16, bool) {
    50  	if k < 0 {
    51  		return -1, false
    52  	}
    53  	if int(k) >= len(vs.k2v) {
    54  		return -1, false
    55  	}
    56  	version := vs.k2v[k]
    57  	if version < 0 {
    58  		return -1, false
    59  	}
    60  	return version, true
    61  }
    62  
    63  // SetMaxKeyVersion sets the max version for the given key.
    64  //
    65  // Setting a version to -1 unsets the key.
    66  //
    67  // Versions are backed by a slice; if the slice is not long enough, it is
    68  // extended to fit the key.
    69  func (vs *Versions) SetMaxKeyVersion(k, v int16) {
    70  	if v < 0 {
    71  		v = -1
    72  	}
    73  	// If the version is < 0, we are unsetting a version. If we are
    74  	// unsetting a version that is more than the amount of keys we already
    75  	// have, we have no reason to unset.
    76  	if k < 0 || v < 0 && int(k) >= len(vs.k2v)+1 {
    77  		return
    78  	}
    79  	needLen := int(k) + 1
    80  	for len(vs.k2v) < needLen {
    81  		vs.k2v = append(vs.k2v, -1)
    82  	}
    83  	vs.k2v[k] = v
    84  }
    85  
    86  // Equal returns whether two versions are equal.
    87  func (vs *Versions) Equal(other *Versions) bool {
    88  	// We allow the version slices to be of different lengths, so long as
    89  	// the versions for keys in one and not the other are -1.
    90  	//
    91  	// Basically, all non-negative-one keys must be equal.
    92  	long, short := vs.k2v, other.k2v
    93  	if len(short) > len(long) {
    94  		long, short = short, long
    95  	}
    96  	for i, v := range short {
    97  		if v != long[i] {
    98  			return false
    99  		}
   100  	}
   101  	for _, v := range long[len(short):] {
   102  		if v >= 0 {
   103  			return false
   104  		}
   105  	}
   106  	return true
   107  }
   108  
   109  // EachMaxKeyVersion calls fn for each key and max version
   110  func (vs *Versions) EachMaxKeyVersion(fn func(k, v int16)) {
   111  	for k, v := range vs.k2v {
   112  		if v >= 0 {
   113  			fn(int16(k), v)
   114  		}
   115  	}
   116  }
   117  
   118  // VersionGuessOpt is an option to change how version guessing is done.
   119  type VersionGuessOpt interface {
   120  	apply(*guessCfg)
   121  }
   122  
   123  type guessOpt struct{ fn func(*guessCfg) }
   124  
   125  func (opt guessOpt) apply(cfg *guessCfg) { opt.fn(cfg) }
   126  
   127  // SkipKeys skips the given keys while guessing versions.
   128  //
   129  // The current default is to skip keys that are only used by brokers:
   130  //
   131  //	 4: LeaderAndISR
   132  //	 5: StopReplica
   133  //	 6: UpdateMetadata
   134  //	 7: ControlledShutdown
   135  //	27: WriteTxnMarkers
   136  func SkipKeys(keys ...int16) VersionGuessOpt {
   137  	return guessOpt{func(cfg *guessCfg) { cfg.skipKeys = keys }}
   138  }
   139  
   140  // TryRaftBroker changes from guessing the version for a classical ZooKeeper
   141  // based broker to guessing for a raft based broker (v2.8+).
   142  //
   143  // Note that with raft, there can be a TryRaftController attempt as well.
   144  func TryRaftBroker() VersionGuessOpt {
   145  	return guessOpt{func(cfg *guessCfg) { cfg.listener = rBroker }}
   146  }
   147  
   148  // TryRaftController changes from guessing the version for a classical
   149  // ZooKeeper based broker to guessing for a raft based controller broker
   150  // (v2.8+).
   151  //
   152  // Note that with raft, there can be a TryRaftBroker attempt as well. Odds are
   153  // that if you are an end user speaking to a raft based Kafka cluster, you are
   154  // speaking to a raft broker. The controller is specifically for broker to
   155  // broker communication.
   156  func TryRaftController() VersionGuessOpt {
   157  	return guessOpt{func(cfg *guessCfg) { cfg.listener = rController }}
   158  }
   159  
   160  type guessCfg struct {
   161  	skipKeys []int16
   162  	listener listener
   163  }
   164  
   165  // VersionGuess attempts to guess which version of Kafka these versions belong
   166  // to. If an exact match can be determined, this returns a string in the format
   167  // v0.#.# or v#.# (depending on whether Kafka is pre-1.0 or post). For
   168  // example, v0.8.0 or v2.7.
   169  //
   170  // Patch numbers are not included in the guess as it is not possible to
   171  // determine the Kafka patch version being used as a client.
   172  //
   173  // If the version is determined to be higher than kversion knows of or is tip,
   174  // this package returns "at least v#.#".
   175  //
   176  // Custom versions, or in-between versions, are detected and return slightly
   177  // more verbose strings.
   178  //
   179  // Options can be specified to change how version guessing is performed, for
   180  // example, certain keys can be skipped, or the guessing can try evaluating the
   181  // versions as Raft broker based versions.
   182  //
   183  // Internally, this function tries guessing the version against both KRaft and
   184  // Kafka APIs. The more exact match is returned.
   185  func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string {
   186  	standard := vs.versionGuess(opts...)
   187  	raftBroker := vs.versionGuess(append(opts, TryRaftBroker())...)
   188  	raftController := vs.versionGuess(append(opts, TryRaftController())...)
   189  
   190  	// If any of these are exact, return the exact guess.
   191  	for _, g := range []guess{
   192  		standard,
   193  		raftBroker,
   194  		raftController,
   195  	} {
   196  		if g.how == guessExact {
   197  			return g.String()
   198  		}
   199  	}
   200  
   201  	// If any are atLeast, that means it is newer than we can guess and we
   202  	// return the highest version.
   203  	for _, g := range []guess{
   204  		standard,
   205  		raftBroker,
   206  		raftController,
   207  	} {
   208  		if g.how == guessAtLeast {
   209  			return g.String()
   210  		}
   211  	}
   212  
   213  	// This is a custom version. We could do some advanced logic to try to
   214  	// return highest of all three guesses, but that may be inaccurate:
   215  	// KRaft may detect a higher guess because not all requests exist in
   216  	// KRaft. Instead, we just return our standard guess.
   217  	return standard.String()
   218  }
   219  
   220  type guess struct {
   221  	v1  string
   222  	v2  string // for between
   223  	how int8
   224  }
   225  
   226  const (
   227  	guessExact = iota
   228  	guessAtLeast
   229  	guessCustomUnknown
   230  	guessCustomAtLeast
   231  	guessBetween
   232  	guessNotEven
   233  )
   234  
   235  func (g guess) String() string {
   236  	switch g.how {
   237  	case guessExact:
   238  		return g.v1
   239  	case guessAtLeast:
   240  		return "at least " + g.v1
   241  	case guessCustomUnknown:
   242  		return "unknown custom version"
   243  	case guessCustomAtLeast:
   244  		return "unknown custom version at least " + g.v1
   245  	case guessBetween:
   246  		return "between " + g.v1 + " and " + g.v2
   247  	case guessNotEven:
   248  		return "not even " + g.v1
   249  	}
   250  	return g.v1
   251  }
   252  
   253  func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
   254  	cfg := guessCfg{
   255  		listener: zkBroker,
   256  		// Envelope was added in 2.7 for kraft and zkBroker in 3.4; we
   257  		// need to skip it for 2.7 through 3.4 otherwise the version
   258  		// detection fails. We can just skip it generally since there
   259  		// are enough differentiating factors that accurately detecting
   260  		// envelope doesn't matter.
   261  		//
   262  		// TODO: add introduced-version to differentiate some specific
   263  		// keys.
   264  		skipKeys: []int16{4, 5, 6, 7, 27, 58},
   265  	}
   266  	for _, opt := range opts {
   267  		opt.apply(&cfg)
   268  	}
   269  
   270  	skip := make(map[int16]bool, len(cfg.skipKeys))
   271  	for _, k := range cfg.skipKeys {
   272  		skip[k] = true
   273  	}
   274  
   275  	var last string
   276  	cmp := make(map[int16]int16, len(maxTip))
   277  	cmpskip := make(map[int16]int16)
   278  	for _, comparison := range []struct {
   279  		cmp  listenerKeys
   280  		name string
   281  	}{
   282  		{max080, "v0.8.0"},
   283  		{max081, "v0.8.1"},
   284  		{max082, "v0.8.2"},
   285  		{max090, "v0.9.0"},
   286  		{max0100, "v0.10.0"},
   287  		{max0101, "v0.10.1"},
   288  		{max0102, "v0.10.2"},
   289  		{max0110, "v0.11.0"},
   290  		{max100, "v1.0"},
   291  		{max110, "v1.1"},
   292  		{max200, "v2.0"},
   293  		{max210, "v2.1"},
   294  		{max220, "v2.2"},
   295  		{max230, "v2.3"},
   296  		{max240, "v2.4"},
   297  		{max250, "v2.5"},
   298  		{max260, "v2.6"},
   299  		{max270, "v2.7"},
   300  		{max280, "v2.8"},
   301  		{max300, "v3.0"},
   302  		{max310, "v3.1"},
   303  		{max320, "v3.2"},
   304  		{max330, "v3.3"},
   305  		{max340, "v3.4"},
   306  		{max350, "v3.5"},
   307  		{max360, "v3.6"},
   308  	} {
   309  		for k, v := range comparison.cmp.filter(cfg.listener) {
   310  			if v == -1 {
   311  				continue
   312  			}
   313  			k16 := int16(k)
   314  			if skip[k16] {
   315  				cmpskip[k16] = v
   316  			} else {
   317  				cmp[k16] = v
   318  			}
   319  		}
   320  
   321  		var under, equal, over bool
   322  
   323  		for k, v := range vs.k2v {
   324  			k16 := int16(k)
   325  			if skip[k16] {
   326  				skipv, ok := cmpskip[k16]
   327  				if v == -1 || !ok {
   328  					continue
   329  				}
   330  				cmp[k16] = skipv
   331  			}
   332  			cmpv, has := cmp[k16]
   333  			if has {
   334  				// If our version for this key is less than the
   335  				// comparison versions, then we are less than what we
   336  				// are comparing.
   337  				if v < cmpv {
   338  					under = true
   339  				} else if v > cmpv {
   340  					// Similarly, if our version is more, then we
   341  					// are over what we are comparing.
   342  					over = true
   343  				} else {
   344  					equal = true
   345  				}
   346  				delete(cmp, k16)
   347  			} else if v >= 0 {
   348  				// If what we are comparing to does not even have this
   349  				// key **and** our version is larger non-zero, then our
   350  				// version is larger than what we are comparing to.
   351  				//
   352  				// We can have a negative version if a key was manually
   353  				// unset.
   354  				over = true
   355  			}
   356  			// If the version is < 0, the key is unset.
   357  		}
   358  
   359  		// If our versions did not clear out what we are comparing against, we
   360  		// do not have all keys that we need for this version.
   361  		if len(cmp) > 0 {
   362  			under = true
   363  		}
   364  
   365  		current := comparison.name
   366  		switch {
   367  		case under && over:
   368  			// Regardless of equal being true or not, this is a custom version.
   369  			if last != "" {
   370  				return guess{v1: last, how: guessCustomAtLeast}
   371  			}
   372  			return guess{v1: last, how: guessCustomUnknown}
   373  
   374  		case under:
   375  			// Regardless of equal being true or not, we have not yet hit
   376  			// this version.
   377  			if last != "" {
   378  				return guess{v1: last, v2: current, how: guessBetween}
   379  			}
   380  			return guess{v1: current, how: guessNotEven}
   381  
   382  		case over:
   383  			// Regardless of equal being true or not, we try again.
   384  			last = current
   385  
   386  		case equal:
   387  			return guess{v1: current, how: guessExact}
   388  		}
   389  		// At least one of under, equal, or over must be true, so there
   390  		// is no default case.
   391  	}
   392  
   393  	return guess{v1: last, how: guessAtLeast}
   394  }
   395  
   396  // String returns a string representation of the versions; the format may
   397  // change.
   398  func (vs *Versions) String() string {
   399  	var buf bytes.Buffer
   400  	w := tabwriter.NewWriter(&buf, 0, 0, 2, ' ', 0)
   401  	for k, v := range vs.k2v {
   402  		if v < 0 {
   403  			continue
   404  		}
   405  		name := kmsg.NameForKey(int16(k))
   406  		if name == "" {
   407  			name = "Unknown"
   408  		}
   409  		fmt.Fprintf(w, "%s\t%d\n", name, v)
   410  	}
   411  	w.Flush()
   412  	return buf.String()
   413  }
   414  
   415  // Stable is a shortcut for the latest _released_ Kafka versions.
   416  //
   417  // This is the default version used in kgo to avoid breaking tip changes.
   418  func Stable() *Versions { return zkBrokerOf(maxStable) }
   419  
   420  // Tip is the latest defined Kafka key versions; this may be slightly out of date.
   421  func Tip() *Versions { return zkBrokerOf(maxTip) }
   422  
   423  func V0_8_0() *Versions  { return zkBrokerOf(max080) }
   424  func V0_8_1() *Versions  { return zkBrokerOf(max081) }
   425  func V0_8_2() *Versions  { return zkBrokerOf(max082) }
   426  func V0_9_0() *Versions  { return zkBrokerOf(max090) }
   427  func V0_10_0() *Versions { return zkBrokerOf(max0100) }
   428  func V0_10_1() *Versions { return zkBrokerOf(max0101) }
   429  func V0_10_2() *Versions { return zkBrokerOf(max0102) }
   430  func V0_11_0() *Versions { return zkBrokerOf(max0110) }
   431  func V1_0_0() *Versions  { return zkBrokerOf(max100) }
   432  func V1_1_0() *Versions  { return zkBrokerOf(max110) }
   433  func V2_0_0() *Versions  { return zkBrokerOf(max200) }
   434  func V2_1_0() *Versions  { return zkBrokerOf(max210) }
   435  func V2_2_0() *Versions  { return zkBrokerOf(max220) }
   436  func V2_3_0() *Versions  { return zkBrokerOf(max230) }
   437  func V2_4_0() *Versions  { return zkBrokerOf(max240) }
   438  func V2_5_0() *Versions  { return zkBrokerOf(max250) }
   439  func V2_6_0() *Versions  { return zkBrokerOf(max260) }
   440  func V2_7_0() *Versions  { return zkBrokerOf(max270) }
   441  func V2_8_0() *Versions  { return zkBrokerOf(max280) }
   442  func V3_0_0() *Versions  { return zkBrokerOf(max300) }
   443  func V3_1_0() *Versions  { return zkBrokerOf(max310) }
   444  func V3_2_0() *Versions  { return zkBrokerOf(max320) }
   445  func V3_3_0() *Versions  { return zkBrokerOf(max330) }
   446  func V3_4_0() *Versions  { return zkBrokerOf(max340) }
   447  func V3_5_0() *Versions  { return zkBrokerOf(max350) }
   448  
   449  /* TODO wait for franz-go v1.16
   450  func V3_6_0() *Versions  { return zkBrokerOf(max360) }
   451  */
   452  
   453  func zkBrokerOf(lks listenerKeys) *Versions {
   454  	return &Versions{lks.filter(zkBroker)}
   455  }
   456  
   457  type listener uint8
   458  
   459  func (l listener) has(target listener) bool {
   460  	return l&target != 0
   461  }
   462  
   463  const (
   464  	zkBroker listener = 1 << iota
   465  	rBroker
   466  	rController
   467  )
   468  
   469  type listenerKey struct {
   470  	listener listener
   471  	version  int16
   472  }
   473  
   474  type listenerKeys []listenerKey
   475  
   476  func (lks listenerKeys) filter(listener listener) []int16 {
   477  	r := make([]int16, 0, len(lks))
   478  	for _, lk := range lks {
   479  		if lk.listener.has(listener) {
   480  			r = append(r, lk.version)
   481  		} else {
   482  			r = append(r, -1)
   483  		}
   484  	}
   485  	return r
   486  }
   487  
   488  // All requests before KRaft started being introduced support the zkBroker, but
   489  // KRaft changed that. Kafka commit 698319b8e2c1f6cb574f339eede6f2a5b1919b55
   490  // added which listeners support which API keys.
   491  func k(listeners ...listener) listenerKey {
   492  	var k listenerKey
   493  	for _, listener := range listeners {
   494  		k.listener |= listener
   495  	}
   496  	return k
   497  }
   498  
   499  func (l *listenerKey) inc() {
   500  	l.version++
   501  }
   502  
   503  // For the comments below, appends are annotated with the key being introduced,
   504  // while incs are annotated with the version the inc results in.
   505  
   506  func nextMax(prev listenerKeys, do func(listenerKeys) listenerKeys) listenerKeys {
   507  	return do(append(listenerKeys(nil), prev...))
   508  }
   509  
   510  var max080 = nextMax(nil, func(listenerKeys) listenerKeys {
   511  	return listenerKeys{
   512  		k(zkBroker, rBroker),              // 0 produce
   513  		k(zkBroker, rBroker, rController), // 1 fetch
   514  		k(zkBroker, rBroker),              // 2 list offset
   515  		k(zkBroker, rBroker),              // 3 metadata
   516  		k(zkBroker),                       // 4 leader and isr
   517  		k(zkBroker),                       // 5 stop replica
   518  		k(zkBroker),                       // 6 update metadata, actually not supported for a bit
   519  		k(zkBroker, rController),          // 7 controlled shutdown, actually not supported for a bit
   520  	}
   521  })
   522  
   523  var max081 = nextMax(max080, func(v listenerKeys) listenerKeys {
   524  	return append(v,
   525  		k(zkBroker, rBroker), // 8 offset commit KAFKA-965 db37ed0054
   526  		k(zkBroker, rBroker), // 9 offset fetch (same)
   527  	)
   528  })
   529  
   530  var max082 = nextMax(max081, func(v listenerKeys) listenerKeys {
   531  	v[8].inc() // 1 offset commit KAFKA-1462
   532  	v[9].inc() // 1 offset fetch KAFKA-1841 161b1aa16e I think?
   533  	return append(v,
   534  		k(zkBroker, rBroker), // 10 find coordinator KAFKA-1012 a670537aa3
   535  		k(zkBroker, rBroker), // 11 join group (same)
   536  		k(zkBroker, rBroker), // 12 heartbeat (same)
   537  	)
   538  })
   539  
   540  var max090 = nextMax(max082, func(v listenerKeys) listenerKeys {
   541  	v[0].inc() // 1 produce KAFKA-2136 436b7ddc38; KAFKA-2083 ?? KIP-13
   542  	v[1].inc() // 1 fetch (same)
   543  	v[6].inc() // 1 update metadata KAFKA-2411 d02ca36ca1
   544  	v[7].inc() // 1 controlled shutdown (same)
   545  	v[8].inc() // 2 offset commit KAFKA-1634
   546  	return append(v,
   547  		k(zkBroker, rBroker), // 13 leave group KAFKA-2397 636e14a991
   548  		k(zkBroker, rBroker), // 14 sync group KAFKA-2464 86eb74d923
   549  		k(zkBroker, rBroker), // 15 describe groups KAFKA-2687 596c203af1
   550  		k(zkBroker, rBroker), // 16 list groups KAFKA-2687 596c203af1
   551  	)
   552  })
   553  
   554  var max0100 = nextMax(max090, func(v listenerKeys) listenerKeys {
   555  	v[0].inc() // 2 produce KAFKA-3025 45c8195fa1 KIP-31 KIP-32
   556  	v[1].inc() // 2 fetch (same)
   557  	v[3].inc() // 1 metadata KAFKA-3306 33d745e2dc
   558  	v[6].inc() // 2 update metadata KAFKA-1215 951e30adc6
   559  	return append(v,
   560  		k(zkBroker, rBroker, rController), // 17 sasl handshake KAFKA-3149 5b375d7bf9
   561  		k(zkBroker, rBroker, rController), // 18 api versions KAFKA-3307 8407dac6ee
   562  	)
   563  })
   564  
   565  var max0101 = nextMax(max0100, func(v listenerKeys) listenerKeys {
   566  	v[1].inc()  // 3 fetch KAFKA-2063 d04b0998c0 KIP-74
   567  	v[2].inc()  // 1 list offset KAFKA-4148 eaaa433fc9 KIP-79
   568  	v[3].inc()  // 2 metadata KAFKA-4093 ecc1fb10fa KIP-78
   569  	v[11].inc() // 1 join group KAFKA-3888 40b1dd3f49 KIP-62
   570  	return append(v,
   571  		k(zkBroker, rBroker, rController), // 19 create topics KAFKA-2945 fc47b9fa6b
   572  		k(zkBroker, rBroker, rController), // 20 delete topics KAFKA-2946 539633ba0e
   573  	)
   574  })
   575  
   576  var max0102 = nextMax(max0101, func(v listenerKeys) listenerKeys {
   577  	v[6].inc()  // 3 update metadata KAFKA-4565 d25671884b KIP-103
   578  	v[19].inc() // 1 create topics KAFKA-4591 da57bc27e7 KIP-108
   579  	return v
   580  })
   581  
   582  var max0110 = nextMax(max0102, func(v listenerKeys) listenerKeys {
   583  	v[0].inc()  // 3 produce KAFKA-4816 5bd06f1d54 KIP-98
   584  	v[1].inc()  // 4 fetch (same)
   585  	v[1].inc()  // 5 fetch KAFKA-4586 8b05ad406d KIP-107
   586  	v[3].inc()  // 4 metadata KAFKA-5291 7311dcbc53 (3 below)
   587  	v[9].inc()  // 2 offset fetch KAFKA-3853 c2d9b95f36 KIP-98
   588  	v[10].inc() // 1 find coordinator KAFKA-5043 d0e7c6b930 KIP-98
   589  	v = append(v,
   590  		k(zkBroker, rBroker), // 21 delete records KAFKA-4586 see above
   591  		k(zkBroker, rBroker), // 22 init producer id KAFKA-4817 bdf4cba047 KIP-98 (raft added in KAFKA-12620 e97cff2702b6ba836c7925caa36ab18066a7c95d KIP-730)
   592  		k(zkBroker, rBroker), // 23 offset for leader epoch KAFKA-1211 0baea2ac13 KIP-101
   593  
   594  		k(zkBroker, rBroker), // 24 add partitions to txn KAFKA-4990 865d82af2c KIP-98 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324)
   595  		k(zkBroker, rBroker), // 25 add offsets to txn (same, same raft)
   596  		k(zkBroker, rBroker), // 26 end txn (same, same raft)
   597  		k(zkBroker, rBroker), // 27 write txn markers (same)
   598  		k(zkBroker, rBroker), // 28 txn offset commit (same, same raft)
   599  
   600  		// raft broker / controller added in 5b0c58ed53c420e93957369516f34346580dac95
   601  		k(zkBroker, rBroker, rController), // 29 describe acls KAFKA-3266 9815e18fef KIP-140
   602  		k(zkBroker, rBroker, rController), // 30 create acls (same)
   603  		k(zkBroker, rBroker, rController), // 31 delete acls (same)
   604  
   605  		k(zkBroker, rBroker),              // 32 describe configs KAFKA-3267 972b754536 KIP-133
   606  		k(zkBroker, rBroker, rController), // 33 alter configs (same) (raft broker 3.0 6e857c531f14d07d5b05f174e6063a124c917324, controller 273d66479dbee2398b09e478ffaf996498d1ab34)
   607  	)
   608  
   609  	// KAFKA-4954 0104b657a1 KIP-124
   610  	v[2].inc()  // 2 list offset (reused in e71dce89c0 KIP-98)
   611  	v[3].inc()  // 3 metadata
   612  	v[8].inc()  // 3 offset commit
   613  	v[9].inc()  // 3 offset fetch
   614  	v[11].inc() // 2 join group
   615  	v[12].inc() // 1 heartbeat
   616  	v[13].inc() // 1 leave group
   617  	v[14].inc() // 1 sync group
   618  	v[15].inc() // 1 describe groups
   619  	v[16].inc() // 1 list group
   620  	v[18].inc() // 1 api versions
   621  	v[19].inc() // 2 create topics
   622  	v[20].inc() // 1 delete topics
   623  
   624  	return v
   625  })
   626  
   627  var max100 = nextMax(max0110, func(v listenerKeys) listenerKeys {
   628  	v[0].inc() // 4 produce KAFKA-4763 fc93fb4b61 KIP-112
   629  	v[1].inc() // 6 fetch (same)
   630  	v[3].inc() // 5 metadata (same)
   631  	v[4].inc() // 1 leader and isr (same)
   632  	v[6].inc() // 4 update metadata (same)
   633  
   634  	v[0].inc()  // 5 produce KAFKA-5793 94692288be
   635  	v[17].inc() // 1 sasl handshake KAFKA-4764 8fca432223 KIP-152
   636  
   637  	return append(v,
   638  		k(zkBroker, rBroker),              // 34 alter replica log dirs KAFKA-5694 adefc8ea07 KIP-113
   639  		k(zkBroker, rBroker),              // 35 describe log dirs (same)
   640  		k(zkBroker, rBroker, rController), // 36 sasl authenticate KAFKA-4764 (see above)
   641  		k(zkBroker, rBroker, rController), // 37 create partitions KAFKA-5856 5f6393f9b1 KIP-195 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324)
   642  	)
   643  })
   644  
   645  var max110 = nextMax(max100, func(v listenerKeys) listenerKeys {
   646  	v = append(v,
   647  		k(zkBroker),          // 38 create delegation token KAFKA-4541 27a8d0f9e7 under KAFKA-1696 KIP-48
   648  		k(zkBroker),          // 39 renew delegation token (same)
   649  		k(zkBroker),          // 40 expire delegation token (same)
   650  		k(zkBroker),          // 41 describe delegation token (same)
   651  		k(zkBroker, rBroker), // 42 delete groups KAFKA-6275 1ed6da7cc8 KIP-229
   652  	)
   653  
   654  	v[1].inc()  // 7 fetch KAFKA-6254 7fe1c2b3d3 KIP-227
   655  	v[32].inc() // 1 describe configs KAFKA-6241 b814a16b96 KIP-226
   656  
   657  	return v
   658  })
   659  
   660  var max200 = nextMax(max110, func(v listenerKeys) listenerKeys {
   661  	v[0].inc()  // 6 produce KAFKA-6028 1facab387f KIP-219
   662  	v[1].inc()  // 8 fetch (same)
   663  	v[2].inc()  // 3 list offset (same)
   664  	v[3].inc()  // 6 metadata (same)
   665  	v[8].inc()  // 4 offset commit (same)
   666  	v[9].inc()  // 4 offset fetch (same)
   667  	v[10].inc() // 2 find coordinator (same)
   668  	v[11].inc() // 3 join group (same)
   669  	v[12].inc() // 2 heartbeat (same)
   670  	v[13].inc() // 2 leave group (same)
   671  	v[14].inc() // 2 sync group (same)
   672  	v[15].inc() // 2 describe groups (same)
   673  	v[16].inc() // 2 list group (same)
   674  	v[18].inc() // 2 api versions (same)
   675  	v[19].inc() // 3 create topics (same)
   676  	v[20].inc() // 2 delete topics (same)
   677  	v[21].inc() // 1 delete records (same)
   678  	v[22].inc() // 1 init producer id (same)
   679  	v[24].inc() // 1 add partitions to txn (same)
   680  	v[25].inc() // 1 add offsets to txn (same)
   681  	v[26].inc() // 1 end txn (same)
   682  	v[28].inc() // 1 txn offset commit (same)
   683  	// 29, 30, 31 bumped below, but also had throttle changes
   684  	v[32].inc() // 2 describe configs (same)
   685  	v[33].inc() // 1 alter configs (same)
   686  	v[34].inc() // 1 alter replica log dirs (same)
   687  	v[35].inc() // 1 describe log dirs (same)
   688  	v[37].inc() // 1 create partitions (same)
   689  	v[38].inc() // 1 create delegation token (same)
   690  	v[39].inc() // 1 renew delegation token (same)
   691  	v[40].inc() // 1 expire delegation token (same)
   692  	v[41].inc() // 1 describe delegation token (same)
   693  	v[42].inc() // 1 delete groups (same)
   694  
   695  	v[29].inc() // 1 describe acls KAFKA-6841 b3aa655a70 KIP-290
   696  	v[30].inc() // 1 create acls (same)
   697  	v[31].inc() // 1 delete acls (same)
   698  
   699  	v[23].inc() // 1 offset for leader epoch KAFKA-6361 9679c44d2b KIP-279
   700  	return v
   701  })
   702  
   703  var max210 = nextMax(max200, func(v listenerKeys) listenerKeys {
   704  	v[8].inc() // 5 offset commit KAFKA-4682 418a91b5d4 KIP-211
   705  
   706  	v[20].inc() // 3 delete topics KAFKA-5975 04770916a7 KIP-322
   707  
   708  	v[1].inc()  // 9 fetch KAFKA-7333 05ba5aa008 KIP-320
   709  	v[2].inc()  // 4 list offset (same)
   710  	v[3].inc()  // 7 metadata (same)
   711  	v[8].inc()  // 6 offset commit (same)
   712  	v[9].inc()  // 5 offset fetch (same)
   713  	v[23].inc() // 2 offset for leader epoch (same, also in Kafka PR #5635 79ad9026a6)
   714  	v[28].inc() // 2 txn offset commit (same)
   715  
   716  	v[0].inc() // 7 produce KAFKA-4514 741cb761c5 KIP-110
   717  	v[1].inc() // 10 fetch (same)
   718  	return v
   719  })
   720  
   721  var max220 = nextMax(max210, func(v listenerKeys) listenerKeys {
   722  	v[2].inc()  // 5 list offset KAFKA-2334 152292994e KIP-207
   723  	v[11].inc() // 4 join group KAFKA-7824 9a9310d074 KIP-394
   724  	v[36].inc() // 1 sasl authenticate KAFKA-7352 e8a3bc7425 KIP-368
   725  
   726  	v[4].inc() // 2 leader and isr KAFKA-7235 2155c6d54b KIP-380
   727  	v[5].inc() // 1 stop replica (same)
   728  	v[6].inc() // 5 update metadata (same)
   729  	v[7].inc() // 2 controlled shutdown (same)
   730  
   731  	return append(v,
   732  		k(zkBroker, rBroker, rController), // 43 elect preferred leaders KAFKA-5692 269b65279c KIP-183 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324)
   733  	)
   734  })
   735  
   736  var max230 = nextMax(max220, func(v listenerKeys) listenerKeys {
   737  	v[3].inc()  // 8 metadata KAFKA-7922 a42f16f980 KIP-430
   738  	v[15].inc() // 3 describe groups KAFKA-7922 f11fa5ef40 KIP-430
   739  
   740  	v[1].inc()  // 11 fetch KAFKA-8365 e2847e8603 KIP-392
   741  	v[23].inc() // 3 offset for leader epoch (same)
   742  
   743  	v[11].inc() // 5 join group KAFKA-7862 0f995ba6be KIP-345
   744  	v[8].inc()  // 7 offset commit KAFKA-8225 9fa331b811 KIP-345
   745  	v[12].inc() // 3 heartbeat (same)
   746  	v[14].inc() // 3 sync group (same)
   747  
   748  	return append(v,
   749  		k(zkBroker, rBroker, rController), // 44 incremental alter configs KAFKA-7466 3b1524c5df KIP-339
   750  	)
   751  })
   752  
   753  var max240 = nextMax(max230, func(v listenerKeys) listenerKeys {
   754  	v[4].inc()  // 3 leader and isr KAFKA-8345 81900d0ba0 KIP-455
   755  	v[15].inc() // 4 describe groups KAFKA-8538 f8db022b08 KIP-345
   756  	v[19].inc() // 4 create topics KAFKA-8305 8e161580b8 KIP-464
   757  	v[43].inc() // 1 elect preferred leaders KAFKA-8286 121308cc7a KIP-460
   758  	v = append(v,
   759  		// raft added in e07de97a4ce730a2755db7eeacb9b3e1f69a12c8 for the following two
   760  		k(zkBroker, rBroker, rController), // 45 alter partition reassignments KAFKA-8345 81900d0ba0 KIP-455
   761  		k(zkBroker, rBroker, rController), // 46 list partition reassignments (same)
   762  
   763  		k(zkBroker, rBroker), // 47 offset delete KAFKA-8730 e24d0e22ab KIP-496
   764  	)
   765  
   766  	v[13].inc() // 3 leave group KAFKA-8221 74c90f46c3 KIP-345
   767  
   768  	// introducing flexible versions; 24 were bumped
   769  	v[3].inc()  // 9 metadata KAFKA-8885 apache/kafka#7325 KIP-482
   770  	v[4].inc()  // 4 leader and isr (same)
   771  	v[5].inc()  // 2 stop replica (same)
   772  	v[6].inc()  // 6 update metadata (same)
   773  	v[7].inc()  // 3 controlled shutdown (same)
   774  	v[8].inc()  // 8 offset commit (same)
   775  	v[9].inc()  // 6 offset fetch (same)
   776  	v[10].inc() // 3 find coordinator (same)
   777  	v[11].inc() // 6 join group (same)
   778  	v[12].inc() // 4 heartbeat (same)
   779  	v[13].inc() // 4 leave group (same)
   780  	v[14].inc() // 4 sync group (same)
   781  	v[15].inc() // 5 describe groups (same)
   782  	v[16].inc() // 3 list group (same)
   783  	v[18].inc() // 3 api versions (same, also KIP-511 [non-flexible fields added])
   784  	v[19].inc() // 5 create topics (same)
   785  	v[20].inc() // 4 delete topics (same)
   786  	v[22].inc() // 2 init producer id (same)
   787  	v[38].inc() // 2 create delegation token (same)
   788  	v[42].inc() // 2 delete groups (same)
   789  	v[43].inc() // 2 elect preferred leaders (same)
   790  	v[44].inc() // 1 incremental alter configs (same)
   791  	// also 45, 46; not bumped since in same release
   792  
   793  	// Create topics (19) was bumped up to 5 in KAFKA-8907 5d0052fe00
   794  	// KIP-525, then 6 in the above bump, then back down to 5 once the
   795  	// tagged PR was merged (KAFKA-8932 1f1179ea64 for the bump down).
   796  
   797  	v[0].inc() // 8 produce KAFKA-8729 f6f24c4700 KIP-467
   798  
   799  	return v
   800  })
   801  
   802  var max250 = nextMax(max240, func(v listenerKeys) listenerKeys {
   803  	v[22].inc() // 3 init producer id KAFKA-8710 fecb977b25 KIP-360
   804  	v[9].inc()  // 7 offset fetch KAFKA-9346 6da70f9b95 KIP-447
   805  
   806  	// more flexible versions, KAFKA-9420 0a2569e2b99 KIP-482
   807  	// 6 bumped, then sasl handshake reverted later in 1a8dcffe4
   808  	v[36].inc() // 2 sasl authenticate
   809  	v[37].inc() // 2 create partitions
   810  	v[39].inc() // 2 renew delegation token
   811  	v[40].inc() // 2 expire delegation token
   812  	v[41].inc() // 2 describe delegation token
   813  
   814  	v[28].inc() // 3 txn offset commit KAFKA-9365 ed7c071e07f KIP-447
   815  
   816  	v[29].inc() // 2 describe acls KAFKA-9026 40b35178e5 KIP-482 (for flexible versions)
   817  	v[30].inc() // 2 create acls KAFKA-9027 738e14edb KIP-482 (flexible)
   818  	v[31].inc() // 2 delete acls KAFKA-9028 738e14edb KIP-482 (flexible)
   819  
   820  	v[11].inc() // 7 join group KAFKA-9437 96c4ce480 KIP-559
   821  	v[14].inc() // 5 sync group (same)
   822  
   823  	return v
   824  })
   825  
   826  var max260 = nextMax(max250, func(v listenerKeys) listenerKeys {
   827  	v[21].inc() // 2 delete records KAFKA-8768 f869e33ab KIP-482 (opportunistic bump for flexible versions)
   828  	v[35].inc() // 2 describe log dirs KAFKA-9435 4f1e8331ff9 KIP-482 (same)
   829  
   830  	v = append(v,
   831  		k(zkBroker, rBroker),              // 48 describe client quotas KAFKA-7740 227a7322b KIP-546 (raft in 5964401bf9aab611bd4a072941bd1c927e044258)
   832  		k(zkBroker, rBroker, rController), // 49 alter client quotas (same)
   833  	)
   834  
   835  	v[5].inc() // 3 stop replica KAFKA-9539 7c7d55dbd KIP-570
   836  
   837  	v[16].inc() // 4 list group KAFKA-9130 fe948d39e KIP-518
   838  	v[32].inc() // 3 describe configs KAFKA-9494 af3b8b50f2 KIP-569
   839  
   840  	return v
   841  })
   842  
   843  var max270 = nextMax(max260, func(v listenerKeys) listenerKeys {
   844  	// KAFKA-10163 a5ffd1ca44c KIP-599
   845  	v[37].inc() // 3 create partitions
   846  	v[19].inc() // 6 create topics (same)
   847  	v[20].inc() // 5 delete topics (same)
   848  
   849  	// KAFKA-9911 b937ec7567 KIP-588
   850  	v[22].inc() // 4 init producer id
   851  	v[24].inc() // 2 add partitions to txn
   852  	v[25].inc() // 2 add offsets to txn
   853  	v[26].inc() // 2 end txn
   854  
   855  	v = append(v,
   856  		k(zkBroker, rBroker, rController), // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554; 38c409cf33c kraft
   857  		k(zkBroker, rBroker, rController), // 51 alter user scram creds, same
   858  	)
   859  
   860  	// KAFKA-10435 634c9175054cc69d10b6da22ea1e95edff6a4747 KIP-595
   861  	// This opted in fetch request to flexible versions.
   862  	//
   863  	// KAFKA-10487: further change in aa5263fba903c85812c0c31443f7d49ee371e9db
   864  	v[1].inc() // 12 fetch
   865  
   866  	// KAFKA-10492 b7c8490cf47b0c18253d6a776b2b35c76c71c65d KIP-595
   867  	//
   868  	// These are the first requests that are raft only.
   869  	v = append(v,
   870  		k(rController),          // 52 vote
   871  		k(rController),          // 53 begin quorum epoch
   872  		k(rController),          // 54 end quorum epoch
   873  		k(rBroker, rController), // 55 describe quorum
   874  	)
   875  
   876  	// KAFKA-8836 57de67db22eb373f92ec5dd449d317ed2bc8b8d1 KIP-497
   877  	v = append(v,
   878  		k(zkBroker, rController), // 56 alter isr
   879  	)
   880  
   881  	// KAFKA-10028 fb4f297207ef62f71e4a6d2d0dac75752933043d KIP-584
   882  	return append(v,
   883  		k(zkBroker, rBroker, rController), // 57 update features (rbroker 3.0 6e857c531f14d07d5b05f174e6063a124c917324; rcontroller 3.2 55ff5d360381af370fe5b3a215831beac49571a4 KIP-778  KAFKA-13823)
   884  	)
   885  })
   886  
   887  var max280 = nextMax(max270, func(v listenerKeys) listenerKeys {
   888  	// KAFKA-10181 KAFKA-10181 KIP-590
   889  	v = append(v,
   890  		k(zkBroker, rController), // 58 envelope, controller first, zk in KAFKA-14446 8b045dcbf6b89e1a9594ff95642d4882765e4b0d KIP-866 Kafka 3.4
   891  	)
   892  
   893  	// KAFKA-10729 85f94d50271c952c3e9ee49c4fc814c0da411618 KIP-482
   894  	// (flexible bumps)
   895  	v[0].inc()  // 9 produce
   896  	v[2].inc()  // 6 list offsets
   897  	v[23].inc() // 4 offset for leader epoch
   898  	v[24].inc() // 3 add partitions to txn
   899  	v[25].inc() // 3 add offsets to txn
   900  	v[26].inc() // 3 end txn
   901  	v[27].inc() // 1 write txn markers
   902  	v[32].inc() // 4 describe configs
   903  	v[33].inc() // 2 alter configs
   904  	v[34].inc() // 2 alter replica log dirs
   905  	v[48].inc() // 1 describe client quotas
   906  	v[49].inc() // 1 alter client quotas
   907  
   908  	// KAFKA-10547 5c921afa4a593478f7d1c49e5db9d787558d0d5e KIP-516
   909  	v[3].inc() // 10 metadata
   910  	v[6].inc() // 7 update metadata
   911  
   912  	// KAFKA-10545 1dd1e7f945d7a8c1dc177223cd88800680f1ff46 KIP-516
   913  	v[4].inc() // 5 leader and isr
   914  
   915  	// KAFKA-10427 2023aed59d863278a6302e03066d387f994f085c KIP-630
   916  	v = append(v,
   917  		k(rController), // 59 fetch snapshot
   918  	)
   919  
   920  	// KAFKA-12204 / KAFKA-10851 302eee63c479fd4b955c44f1058a5e5d111acb57 KIP-700
   921  	v = append(v,
   922  		k(zkBroker, rBroker), // 60 describe cluster
   923  	)
   924  
   925  	// KAFKA-12212 7a1d1d9a69a241efd68e572badee999229b3942f KIP-700
   926  	v[3].inc() // 11 metadata
   927  
   928  	// KAFKA-10764 4f588f7ca2a1c5e8dd845863da81425ac69bac92 KIP-516
   929  	v[19].inc() // 7 create topics
   930  	v[20].inc() // 6 delete topics
   931  
   932  	// KAFKA-12238 e9edf104866822d9e6c3b637ffbf338767b5bf27 KIP-664
   933  	v = append(v,
   934  		k(zkBroker, rBroker), // 61 describe producers
   935  	)
   936  
   937  	// KAFKA-12248 a022072df3c8175950c03263d2bbf2e3ea7a7a5d KIP-500
   938  	// (commit mentions KIP-500, these are actually described in KIP-631)
   939  	// Broker registration was later updated in d9bb2ef596343da9402bff4903b129cff1f7c22b
   940  	v = append(v,
   941  		k(rController), // 62 broker registration
   942  		k(rController), // 63 broker heartbeat
   943  	)
   944  
   945  	// KAFKA-12249 3f36f9a7ca153a9d221f6bedeb7d1503aa18eff1 KIP-500 / KIP-631
   946  	// Renamed from Decommission to Unregister in 06dce721ec0185d49fac37775dbf191d0e80e687
   947  	v = append(v,
   948  		// kraft broker added in 7143267f71ca0c14957d8560fbc42a5f8aac564d
   949  		k(rBroker, rController), // 64 unregister broker
   950  	)
   951  	return v
   952  })
   953  
   954  var max300 = nextMax(max280, func(v listenerKeys) listenerKeys {
   955  	// KAFKA-12267 3f09fb97b6943c0612488dfa8e5eab8078fd7ca0 KIP-664
   956  	v = append(v,
   957  		k(zkBroker, rBroker), // 65 describe transactions
   958  	)
   959  
   960  	// KAFKA-12369 3708a7c6c1ecf1304f091dda1e79ae53ba2df489 KIP-664
   961  	v = append(v,
   962  		k(zkBroker, rBroker), // 66 list transactions
   963  	)
   964  
   965  	// KAFKA-12620 72d108274c98dca44514007254552481c731c958 KIP-730
   966  	// raft broker added in  e97cff2702b6ba836c7925caa36ab18066a7c95d
   967  	v = append(v,
   968  		k(zkBroker, rController), // 67 allocate producer ids
   969  	)
   970  
   971  	// KAFKA-12541 bd72ef1bf1e40feb3bc17349a385b479fa5fa530 KIP-734
   972  	v[2].inc() // 7 list offsets
   973  
   974  	// KAFKA-12663 f5d5f654db359af077088685e29fbe5ea69616cf KIP-699
   975  	v[10].inc() // 4 find coordinator
   976  
   977  	// KAFKA-12234 e00c0f3719ad0803620752159ef8315d668735d6 KIP-709
   978  	v[9].inc() // 8 offset fetch
   979  
   980  	return v
   981  })
   982  
   983  var max310 = nextMax(max300, func(v listenerKeys) listenerKeys {
   984  	// KAFKA-10580 2b8aff58b575c199ee8372e5689420c9d77357a5 KIP-516
   985  	v[1].inc() // 13 fetch
   986  
   987  	// KAFKA-10744 1d22b0d70686aef5689b775ea2ea7610a37f3e8c KIP-516
   988  	v[3].inc() // 12 metadata
   989  
   990  	return v
   991  })
   992  
   993  var max320 = nextMax(max310, func(v listenerKeys) listenerKeys {
   994  	// KAFKA-13495 69645f1fe5103adb00de6fa43152e7df989f3aea KIP-800
   995  	v[11].inc() // 8 join group
   996  
   997  	// KAFKA-13496 bf609694f83931990ce63e0123f811e6475820c5 KIP-800
   998  	v[13].inc() // 5 leave group
   999  
  1000  	// KAFKA-13527 31fca1611a6780e8a8aa3ac21618135201718e32 KIP-784
  1001  	v[35].inc() // 3 describe log dirs
  1002  
  1003  	// KAFKA-13435 c8fbe26f3bd3a7c018e7619deba002ee454208b9 KIP-814
  1004  	v[11].inc() // 9 join group
  1005  
  1006  	// KAFKA-13587 52621613fd386203773ba93903abd50b46fa093a KIP-704
  1007  	v[4].inc()  // 6 leader and isr
  1008  	v[56].inc() // 1 alter isr => alter partition
  1009  
  1010  	return v
  1011  })
  1012  
  1013  var max330 = nextMax(max320, func(v listenerKeys) listenerKeys {
  1014  	// KAFKA-13823 55ff5d360381af370fe5b3a215831beac49571a4 KIP-778
  1015  	v[57].inc() // 1 update features
  1016  
  1017  	// KAFKA-13958 4fcfd9ddc4a8da3d4cfbb69268c06763352e29a9 KIP-827
  1018  	v[35].inc() // 4 describe log dirs
  1019  
  1020  	// KAFKA-841 f83d95d9a28 KIP-841
  1021  	v[56].inc() // 2 alter partition
  1022  
  1023  	// KAFKA-13888 a126e3a622f KIP-836
  1024  	v[55].inc() // 1 describe quorum
  1025  
  1026  	// KAFKA-6945 d65d8867983 KIP-373
  1027  	v[29].inc() // 3 describe acls
  1028  	v[30].inc() // 3 create acls
  1029  	v[31].inc() // 3 delete acls
  1030  	v[38].inc() // 3 create delegation token
  1031  	v[41].inc() // 3 describe delegation token
  1032  
  1033  	return v
  1034  })
  1035  
  1036  var max340 = nextMax(max330, func(v listenerKeys) listenerKeys {
  1037  	// KAFKA-14304 7b7e40a536a79cebf35cc278b9375c8352d342b9 KIP-866
  1038  	// KAFKA-14448 67c72596afe58363eceeb32084c5c04637a33831 added BrokerRegistration
  1039  	// KAFKA-14493 db490707606855c265bc938e1b236070e0e2eba5 changed BrokerRegistration
  1040  	// KAFKA-14304 0bb05d8679b684ad8fbb2eb40dfc00066186a75a changed BrokerRegistration back to a bool...
  1041  	// 5b521031edea8ea7cbcca7dc24a58429423740ff added tag to ApiVersions
  1042  	v[4].inc()  // 7 leader and isr
  1043  	v[5].inc()  // 4 stop replica
  1044  	v[6].inc()  // 8 update metadata
  1045  	v[62].inc() // 1 broker registration
  1046  	return v
  1047  })
  1048  
  1049  var max350 = nextMax(max340, func(v listenerKeys) listenerKeys {
  1050  	// KAFKA-13369 7146ac57ba9ddd035dac992b9f188a8e7677c08d KIP-405
  1051  	v[1].inc() // 14 fetch
  1052  	v[2].inc() // 8 list offsets
  1053  
  1054  	v[1].inc()  // 15 fetch // KAFKA-14617 79b5f7f1ce2 KIP-903
  1055  	v[56].inc() // 3 alter partition // KAFKA-14617 8c88cdb7186b1d594f991eb324356dcfcabdf18a KIP-903
  1056  	return v
  1057  })
  1058  
  1059  var max360 = nextMax(max350, func(v listenerKeys) listenerKeys {
  1060  	// KAFKA-14402 29a1a16668d76a1cc04ec9e39ea13026f2dce1de KIP-890
  1061  	// Later commit swapped to stable
  1062  	v[24].inc() // 4 add partitions to txn
  1063  	return v
  1064  })
  1065  
  1066  var (
  1067  	maxStable = max360
  1068  	maxTip    = nextMax(maxStable, func(v listenerKeys) listenerKeys {
  1069  		return v
  1070  	})
  1071  )
  1072  

View as plain text