...

Source file src/github.com/twmb/franz-go/pkg/kmsg/api.go

Documentation: github.com/twmb/franz-go/pkg/kmsg

     1  // Package kmsg contains Kafka request and response types and autogenerated
     2  // serialization and deserialization functions.
     3  //
     4  // This package may bump major versions whenever Kafka makes a backwards
     5  // incompatible protocol change, per the types chosen for this package. For
     6  // example, Kafka can change a field from non-nullable to nullable, which would
     7  // require changing a field from a non-pointer to a pointer. We could get
     8  // around this by making everything an opaque struct and having getters, but
     9  // that is more tedious than having a few rare major version bumps.
    10  //
    11  // If you are using this package directly with kgo, you should either always
    12  // use New functions, or Default functions after creating structs, or you
    13  // should pin the max supported version. If you use New functions, you will
    14  // have safe defaults as new fields are added. If you pin versions, you will
    15  // avoid new fields being used. If you do neither of these, you may opt in to
    16  // new fields that do not have safe zero value defaults, and this may lead to
    17  // errors or unexpected results.
    18  //
    19  // Thus, whenever you initialize a struct from this package, do the following:
    20  //
    21  //	struct := kmsg.NewFoo()
    22  //	struct.Field = "value I want to set"
    23  //
    24  // Most of this package is generated, but a few things are manual. What is
    25  // manual: all interfaces, the RequestFormatter, record / message / record
    26  // batch reading, and sticky member metadata serialization.
    27  package kmsg
    28  
    29  import (
    30  	"context"
    31  	"sort"
    32  
    33  	"github.com/twmb/franz-go/pkg/kmsg/internal/kbin"
    34  )
    35  
    36  //go:generate cp ../kbin/primitives.go internal/kbin/
    37  
    38  // Requestor issues requests. Notably, the kgo.Client and kgo.Broker implements
    39  // Requestor. All Requests in this package have a RequestWith function to have
    40  // type-safe requests.
    41  type Requestor interface {
    42  	// Request issues a Request and returns either a Response or an error.
    43  	Request(context.Context, Request) (Response, error)
    44  }
    45  
    46  // Request represents a type that can be requested to Kafka.
    47  type Request interface {
    48  	// Key returns the protocol key for this message kind.
    49  	Key() int16
    50  	// MaxVersion returns the maximum protocol version this message
    51  	// supports.
    52  	//
    53  	// This function allows one to implement a client that chooses message
    54  	// versions based off of the max of a message's max version in the
    55  	// client and the broker's max supported version.
    56  	MaxVersion() int16
    57  	// SetVersion sets the version to use for this request and response.
    58  	SetVersion(int16)
    59  	// GetVersion returns the version currently set to use for the request
    60  	// and response.
    61  	GetVersion() int16
    62  	// IsFlexible returns whether the request at its current version is
    63  	// "flexible" as per the KIP-482.
    64  	IsFlexible() bool
    65  	// AppendTo appends this message in wire protocol form to a slice and
    66  	// returns the slice.
    67  	AppendTo([]byte) []byte
    68  	// ReadFrom parses all of the input slice into the response type.
    69  	//
    70  	// This should return an error if too little data is input.
    71  	ReadFrom([]byte) error
    72  	// ResponseKind returns an empty Response that is expected for
    73  	// this message request.
    74  	ResponseKind() Response
    75  }
    76  
    77  // AdminRequest represents a request that must be issued to Kafka controllers.
    78  type AdminRequest interface {
    79  	// IsAdminRequest is a method attached to requests that must be
    80  	// issed to Kafka controllers.
    81  	IsAdminRequest()
    82  	Request
    83  }
    84  
    85  // GroupCoordinatorRequest represents a request that must be issued to a
    86  // group coordinator.
    87  type GroupCoordinatorRequest interface {
    88  	// IsGroupCoordinatorRequest is a method attached to requests that
    89  	// must be issued to group coordinators.
    90  	IsGroupCoordinatorRequest()
    91  	Request
    92  }
    93  
    94  // TxnCoordinatorRequest represents a request that must be issued to a
    95  // transaction coordinator.
    96  type TxnCoordinatorRequest interface {
    97  	// IsTxnCoordinatorRequest is a method attached to requests that
    98  	// must be issued to transaction coordinators.
    99  	IsTxnCoordinatorRequest()
   100  	Request
   101  }
   102  
   103  // Response represents a type that Kafka responds with.
   104  type Response interface {
   105  	// Key returns the protocol key for this message kind.
   106  	Key() int16
   107  	// MaxVersion returns the maximum protocol version this message
   108  	// supports.
   109  	MaxVersion() int16
   110  	// SetVersion sets the version to use for this request and response.
   111  	SetVersion(int16)
   112  	// GetVersion returns the version currently set to use for the request
   113  	// and response.
   114  	GetVersion() int16
   115  	// IsFlexible returns whether the request at its current version is
   116  	// "flexible" as per the KIP-482.
   117  	IsFlexible() bool
   118  	// AppendTo appends this message in wire protocol form to a slice and
   119  	// returns the slice.
   120  	AppendTo([]byte) []byte
   121  	// ReadFrom parses all of the input slice into the response type.
   122  	//
   123  	// This should return an error if too little data is input.
   124  	ReadFrom([]byte) error
   125  	// RequestKind returns an empty Request that is expected for
   126  	// this message request.
   127  	RequestKind() Request
   128  }
   129  
   130  // UnsafeReadFrom, implemented by all requests and responses generated in this
   131  // package, switches to using unsafe slice-to-string conversions when reading.
   132  // This can be used to avoid a lot of garbage, but it means to have to be
   133  // careful when using any strings in structs: if you hold onto the string, the
   134  // underlying response slice will not be garbage collected.
   135  type UnsafeReadFrom interface {
   136  	UnsafeReadFrom([]byte) error
   137  }
   138  
   139  // ThrottleResponse represents a response that could have a throttle applied by
   140  // Kafka. Any response that implements ThrottleResponse also implements
   141  // SetThrottleResponse.
   142  //
   143  // Kafka 2.0.0 switched throttles from being applied before responses to being
   144  // applied after responses.
   145  type ThrottleResponse interface {
   146  	// Throttle returns the response's throttle millis value and
   147  	// whether Kafka applies the throttle after the response.
   148  	Throttle() (int32, bool)
   149  }
   150  
   151  // SetThrottleResponse sets the throttle in a response that can have a throttle
   152  // applied. Any kmsg interface that implements ThrottleResponse also implements
   153  // SetThrottleResponse.
   154  type SetThrottleResponse interface {
   155  	// SetThrottle sets the response's throttle millis value.
   156  	SetThrottle(int32)
   157  }
   158  
   159  // TimeoutRequest represents a request that has a TimeoutMillis field.
   160  // Any request that implements TimeoutRequest also implements SetTimeoutRequest.
   161  type TimeoutRequest interface {
   162  	// Timeout returns the request's timeout millis value.
   163  	Timeout() int32
   164  }
   165  
   166  // SetTimeoutRequest sets the timeout in a request that can have a timeout
   167  // applied. Any kmsg interface that implements ThrottleRequest also implements
   168  // SetThrottleRequest.
   169  type SetTimeoutRequest interface {
   170  	// SetTimeout sets the request's timeout millis value.
   171  	SetTimeout(timeoutMillis int32)
   172  }
   173  
   174  // RequestFormatter formats requests.
   175  //
   176  // The default empty struct works correctly, but can be extended with the
   177  // NewRequestFormatter function.
   178  type RequestFormatter struct {
   179  	clientID *string
   180  }
   181  
   182  // RequestFormatterOpt applys options to a RequestFormatter.
   183  type RequestFormatterOpt interface {
   184  	apply(*RequestFormatter)
   185  }
   186  
   187  type formatterOpt struct{ fn func(*RequestFormatter) }
   188  
   189  func (opt formatterOpt) apply(f *RequestFormatter) { opt.fn(f) }
   190  
   191  // FormatterClientID attaches the given client ID to any issued request,
   192  // minus controlled shutdown v0, which uses its own special format.
   193  func FormatterClientID(id string) RequestFormatterOpt {
   194  	return formatterOpt{func(f *RequestFormatter) { f.clientID = &id }}
   195  }
   196  
   197  // NewRequestFormatter returns a RequestFormatter with the opts applied.
   198  func NewRequestFormatter(opts ...RequestFormatterOpt) *RequestFormatter {
   199  	a := new(RequestFormatter)
   200  	for _, opt := range opts {
   201  		opt.apply(a)
   202  	}
   203  	return a
   204  }
   205  
   206  // AppendRequest appends a full message request to dst, returning the updated
   207  // slice. This message is the full body that needs to be written to issue a
   208  // Kafka request.
   209  func (f *RequestFormatter) AppendRequest(
   210  	dst []byte,
   211  	r Request,
   212  	correlationID int32,
   213  ) []byte {
   214  	dst = append(dst, 0, 0, 0, 0) // reserve length
   215  	k := r.Key()
   216  	v := r.GetVersion()
   217  	dst = kbin.AppendInt16(dst, k)
   218  	dst = kbin.AppendInt16(dst, v)
   219  	dst = kbin.AppendInt32(dst, correlationID)
   220  	if k == 7 && v == 0 {
   221  		return dst
   222  	}
   223  
   224  	// Even with flexible versions, we do not use a compact client id.
   225  	// Clients issue ApiVersions immediately before knowing the broker
   226  	// version, and old brokers will not be able to understand a compact
   227  	// client id.
   228  	dst = kbin.AppendNullableString(dst, f.clientID)
   229  
   230  	// The flexible tags end the request header, and then begins the
   231  	// request body.
   232  	if r.IsFlexible() {
   233  		var numTags uint8
   234  		dst = append(dst, numTags)
   235  		if numTags != 0 {
   236  			// TODO when tags are added
   237  		}
   238  	}
   239  
   240  	// Now the request body.
   241  	dst = r.AppendTo(dst)
   242  
   243  	kbin.AppendInt32(dst[:0], int32(len(dst[4:])))
   244  	return dst
   245  }
   246  
   247  // StringPtr is a helper to return a pointer to a string.
   248  func StringPtr(in string) *string {
   249  	return &in
   250  }
   251  
   252  // ReadFrom provides decoding various versions of sticky member metadata. A key
   253  // point of this type is that it does not contain a version number inside it,
   254  // but it is versioned: if decoding v1 fails, this falls back to v0.
   255  func (s *StickyMemberMetadata) ReadFrom(src []byte) error {
   256  	return s.readFrom(src, false)
   257  }
   258  
   259  // UnsafeReadFrom is the same as ReadFrom, but uses unsafe slice to string
   260  // conversions to reduce garbage.
   261  func (s *StickyMemberMetadata) UnsafeReadFrom(src []byte) error {
   262  	return s.readFrom(src, true)
   263  }
   264  
   265  func (s *StickyMemberMetadata) readFrom(src []byte, unsafe bool) error {
   266  	b := kbin.Reader{Src: src}
   267  	numAssignments := b.ArrayLen()
   268  	if numAssignments < 0 {
   269  		numAssignments = 0
   270  	}
   271  	need := numAssignments - int32(cap(s.CurrentAssignment))
   272  	if need > 0 {
   273  		s.CurrentAssignment = append(s.CurrentAssignment[:cap(s.CurrentAssignment)], make([]StickyMemberMetadataCurrentAssignment, need)...)
   274  	} else {
   275  		s.CurrentAssignment = s.CurrentAssignment[:numAssignments]
   276  	}
   277  	for i := int32(0); i < numAssignments; i++ {
   278  		var topic string
   279  		if unsafe {
   280  			topic = b.UnsafeString()
   281  		} else {
   282  			topic = b.String()
   283  		}
   284  		numPartitions := b.ArrayLen()
   285  		if numPartitions < 0 {
   286  			numPartitions = 0
   287  		}
   288  		a := &s.CurrentAssignment[i]
   289  		a.Topic = topic
   290  		need := numPartitions - int32(cap(a.Partitions))
   291  		if need > 0 {
   292  			a.Partitions = append(a.Partitions[:cap(a.Partitions)], make([]int32, need)...)
   293  		} else {
   294  			a.Partitions = a.Partitions[:numPartitions]
   295  		}
   296  		for i := range a.Partitions {
   297  			a.Partitions[i] = b.Int32()
   298  		}
   299  	}
   300  	if len(b.Src) > 0 {
   301  		s.Generation = b.Int32()
   302  	} else {
   303  		s.Generation = -1
   304  	}
   305  	return b.Complete()
   306  }
   307  
   308  // AppendTo provides appending various versions of sticky member metadata to dst.
   309  // If generation is not -1 (default for v0), this appends as version 1.
   310  func (s *StickyMemberMetadata) AppendTo(dst []byte) []byte {
   311  	dst = kbin.AppendArrayLen(dst, len(s.CurrentAssignment))
   312  	for _, assignment := range s.CurrentAssignment {
   313  		dst = kbin.AppendString(dst, assignment.Topic)
   314  		dst = kbin.AppendArrayLen(dst, len(assignment.Partitions))
   315  		for _, partition := range assignment.Partitions {
   316  			dst = kbin.AppendInt32(dst, partition)
   317  		}
   318  	}
   319  	if s.Generation != -1 {
   320  		dst = kbin.AppendInt32(dst, s.Generation)
   321  	}
   322  	return dst
   323  }
   324  
   325  // TagReader has is a type that has the ability to skip tags.
   326  //
   327  // This is effectively a trimmed version of the kbin.Reader, with the purpose
   328  // being that kmsg cannot depend on an external package.
   329  type TagReader interface {
   330  	// Uvarint returns a uint32. If the reader has read too much and has
   331  	// exhausted all bytes, this should set the reader's internal state
   332  	// to failed and return 0.
   333  	Uvarint() uint32
   334  
   335  	// Span returns n bytes from the reader. If the reader has read too
   336  	// much and exhausted all bytes this should set the reader's internal
   337  	// to failed and return nil.
   338  	Span(n int) []byte
   339  }
   340  
   341  // SkipTags skips tags in a TagReader.
   342  func SkipTags(b TagReader) {
   343  	for num := b.Uvarint(); num > 0; num-- {
   344  		_, size := b.Uvarint(), b.Uvarint()
   345  		b.Span(int(size))
   346  	}
   347  }
   348  
   349  // internalSkipTags skips tags in the duplicated inner kbin.Reader.
   350  func internalSkipTags(b *kbin.Reader) {
   351  	for num := b.Uvarint(); num > 0; num-- {
   352  		_, size := b.Uvarint(), b.Uvarint()
   353  		b.Span(int(size))
   354  	}
   355  }
   356  
   357  // ReadTags reads tags in a TagReader and returns the tags.
   358  func ReadTags(b TagReader) Tags {
   359  	var t Tags
   360  	for num := b.Uvarint(); num > 0; num-- {
   361  		key, size := b.Uvarint(), b.Uvarint()
   362  		t.Set(key, b.Span(int(size)))
   363  	}
   364  	return t
   365  }
   366  
   367  // internalReadTags reads tags in a reader and returns the tags from a
   368  // duplicated inner kbin.Reader.
   369  func internalReadTags(b *kbin.Reader) Tags {
   370  	var t Tags
   371  	for num := b.Uvarint(); num > 0; num-- {
   372  		key, size := b.Uvarint(), b.Uvarint()
   373  		t.Set(key, b.Span(int(size)))
   374  	}
   375  	return t
   376  }
   377  
   378  // Tags is an opaque structure capturing unparsed tags.
   379  type Tags struct {
   380  	keyvals map[uint32][]byte
   381  }
   382  
   383  // Len returns the number of keyvals in Tags.
   384  func (t *Tags) Len() int { return len(t.keyvals) }
   385  
   386  // Each calls fn for each key and val in the tags.
   387  func (t *Tags) Each(fn func(uint32, []byte)) {
   388  	if len(t.keyvals) == 0 {
   389  		return
   390  	}
   391  	// We must encode keys in order. We expect to have limited (no) unknown
   392  	// keys, so for now, we take a lazy approach and allocate an ordered
   393  	// slice.
   394  	ordered := make([]uint32, 0, len(t.keyvals))
   395  	for key := range t.keyvals {
   396  		ordered = append(ordered, key)
   397  	}
   398  	sort.Slice(ordered, func(i, j int) bool { return ordered[i] < ordered[j] })
   399  	for _, key := range ordered {
   400  		fn(key, t.keyvals[key])
   401  	}
   402  }
   403  
   404  // Set sets a tag's key and val.
   405  //
   406  // Note that serializing tags does NOT check if the set key overlaps with an
   407  // existing used key. It is invalid to set a key used by Kafka itself.
   408  func (t *Tags) Set(key uint32, val []byte) {
   409  	if t.keyvals == nil {
   410  		t.keyvals = make(map[uint32][]byte)
   411  	}
   412  	t.keyvals[key] = val
   413  }
   414  
   415  // AppendEach appends each keyval in tags to dst and returns the updated dst.
   416  func (t *Tags) AppendEach(dst []byte) []byte {
   417  	t.Each(func(key uint32, val []byte) {
   418  		dst = kbin.AppendUvarint(dst, key)
   419  		dst = kbin.AppendUvarint(dst, uint32(len(val)))
   420  		dst = append(dst, val...)
   421  	})
   422  	return dst
   423  }
   424  

View as plain text