1 // Package kmsg contains Kafka request and response types and autogenerated 2 // serialization and deserialization functions. 3 // 4 // This package may bump major versions whenever Kafka makes a backwards 5 // incompatible protocol change, per the types chosen for this package. For 6 // example, Kafka can change a field from non-nullable to nullable, which would 7 // require changing a field from a non-pointer to a pointer. We could get 8 // around this by making everything an opaque struct and having getters, but 9 // that is more tedious than having a few rare major version bumps. 10 // 11 // If you are using this package directly with kgo, you should either always 12 // use New functions, or Default functions after creating structs, or you 13 // should pin the max supported version. If you use New functions, you will 14 // have safe defaults as new fields are added. If you pin versions, you will 15 // avoid new fields being used. If you do neither of these, you may opt in to 16 // new fields that do not have safe zero value defaults, and this may lead to 17 // errors or unexpected results. 18 // 19 // Thus, whenever you initialize a struct from this package, do the following: 20 // 21 // struct := kmsg.NewFoo() 22 // struct.Field = "value I want to set" 23 // 24 // Most of this package is generated, but a few things are manual. What is 25 // manual: all interfaces, the RequestFormatter, record / message / record 26 // batch reading, and sticky member metadata serialization. 27 package kmsg 28 29 import ( 30 "context" 31 "sort" 32 33 "github.com/twmb/franz-go/pkg/kmsg/internal/kbin" 34 ) 35 36 //go:generate cp ../kbin/primitives.go internal/kbin/ 37 38 // Requestor issues requests. Notably, the kgo.Client and kgo.Broker implements 39 // Requestor. All Requests in this package have a RequestWith function to have 40 // type-safe requests. 41 type Requestor interface { 42 // Request issues a Request and returns either a Response or an error. 43 Request(context.Context, Request) (Response, error) 44 } 45 46 // Request represents a type that can be requested to Kafka. 47 type Request interface { 48 // Key returns the protocol key for this message kind. 49 Key() int16 50 // MaxVersion returns the maximum protocol version this message 51 // supports. 52 // 53 // This function allows one to implement a client that chooses message 54 // versions based off of the max of a message's max version in the 55 // client and the broker's max supported version. 56 MaxVersion() int16 57 // SetVersion sets the version to use for this request and response. 58 SetVersion(int16) 59 // GetVersion returns the version currently set to use for the request 60 // and response. 61 GetVersion() int16 62 // IsFlexible returns whether the request at its current version is 63 // "flexible" as per the KIP-482. 64 IsFlexible() bool 65 // AppendTo appends this message in wire protocol form to a slice and 66 // returns the slice. 67 AppendTo([]byte) []byte 68 // ReadFrom parses all of the input slice into the response type. 69 // 70 // This should return an error if too little data is input. 71 ReadFrom([]byte) error 72 // ResponseKind returns an empty Response that is expected for 73 // this message request. 74 ResponseKind() Response 75 } 76 77 // AdminRequest represents a request that must be issued to Kafka controllers. 78 type AdminRequest interface { 79 // IsAdminRequest is a method attached to requests that must be 80 // issed to Kafka controllers. 81 IsAdminRequest() 82 Request 83 } 84 85 // GroupCoordinatorRequest represents a request that must be issued to a 86 // group coordinator. 87 type GroupCoordinatorRequest interface { 88 // IsGroupCoordinatorRequest is a method attached to requests that 89 // must be issued to group coordinators. 90 IsGroupCoordinatorRequest() 91 Request 92 } 93 94 // TxnCoordinatorRequest represents a request that must be issued to a 95 // transaction coordinator. 96 type TxnCoordinatorRequest interface { 97 // IsTxnCoordinatorRequest is a method attached to requests that 98 // must be issued to transaction coordinators. 99 IsTxnCoordinatorRequest() 100 Request 101 } 102 103 // Response represents a type that Kafka responds with. 104 type Response interface { 105 // Key returns the protocol key for this message kind. 106 Key() int16 107 // MaxVersion returns the maximum protocol version this message 108 // supports. 109 MaxVersion() int16 110 // SetVersion sets the version to use for this request and response. 111 SetVersion(int16) 112 // GetVersion returns the version currently set to use for the request 113 // and response. 114 GetVersion() int16 115 // IsFlexible returns whether the request at its current version is 116 // "flexible" as per the KIP-482. 117 IsFlexible() bool 118 // AppendTo appends this message in wire protocol form to a slice and 119 // returns the slice. 120 AppendTo([]byte) []byte 121 // ReadFrom parses all of the input slice into the response type. 122 // 123 // This should return an error if too little data is input. 124 ReadFrom([]byte) error 125 // RequestKind returns an empty Request that is expected for 126 // this message request. 127 RequestKind() Request 128 } 129 130 // UnsafeReadFrom, implemented by all requests and responses generated in this 131 // package, switches to using unsafe slice-to-string conversions when reading. 132 // This can be used to avoid a lot of garbage, but it means to have to be 133 // careful when using any strings in structs: if you hold onto the string, the 134 // underlying response slice will not be garbage collected. 135 type UnsafeReadFrom interface { 136 UnsafeReadFrom([]byte) error 137 } 138 139 // ThrottleResponse represents a response that could have a throttle applied by 140 // Kafka. Any response that implements ThrottleResponse also implements 141 // SetThrottleResponse. 142 // 143 // Kafka 2.0.0 switched throttles from being applied before responses to being 144 // applied after responses. 145 type ThrottleResponse interface { 146 // Throttle returns the response's throttle millis value and 147 // whether Kafka applies the throttle after the response. 148 Throttle() (int32, bool) 149 } 150 151 // SetThrottleResponse sets the throttle in a response that can have a throttle 152 // applied. Any kmsg interface that implements ThrottleResponse also implements 153 // SetThrottleResponse. 154 type SetThrottleResponse interface { 155 // SetThrottle sets the response's throttle millis value. 156 SetThrottle(int32) 157 } 158 159 // TimeoutRequest represents a request that has a TimeoutMillis field. 160 // Any request that implements TimeoutRequest also implements SetTimeoutRequest. 161 type TimeoutRequest interface { 162 // Timeout returns the request's timeout millis value. 163 Timeout() int32 164 } 165 166 // SetTimeoutRequest sets the timeout in a request that can have a timeout 167 // applied. Any kmsg interface that implements ThrottleRequest also implements 168 // SetThrottleRequest. 169 type SetTimeoutRequest interface { 170 // SetTimeout sets the request's timeout millis value. 171 SetTimeout(timeoutMillis int32) 172 } 173 174 // RequestFormatter formats requests. 175 // 176 // The default empty struct works correctly, but can be extended with the 177 // NewRequestFormatter function. 178 type RequestFormatter struct { 179 clientID *string 180 } 181 182 // RequestFormatterOpt applys options to a RequestFormatter. 183 type RequestFormatterOpt interface { 184 apply(*RequestFormatter) 185 } 186 187 type formatterOpt struct{ fn func(*RequestFormatter) } 188 189 func (opt formatterOpt) apply(f *RequestFormatter) { opt.fn(f) } 190 191 // FormatterClientID attaches the given client ID to any issued request, 192 // minus controlled shutdown v0, which uses its own special format. 193 func FormatterClientID(id string) RequestFormatterOpt { 194 return formatterOpt{func(f *RequestFormatter) { f.clientID = &id }} 195 } 196 197 // NewRequestFormatter returns a RequestFormatter with the opts applied. 198 func NewRequestFormatter(opts ...RequestFormatterOpt) *RequestFormatter { 199 a := new(RequestFormatter) 200 for _, opt := range opts { 201 opt.apply(a) 202 } 203 return a 204 } 205 206 // AppendRequest appends a full message request to dst, returning the updated 207 // slice. This message is the full body that needs to be written to issue a 208 // Kafka request. 209 func (f *RequestFormatter) AppendRequest( 210 dst []byte, 211 r Request, 212 correlationID int32, 213 ) []byte { 214 dst = append(dst, 0, 0, 0, 0) // reserve length 215 k := r.Key() 216 v := r.GetVersion() 217 dst = kbin.AppendInt16(dst, k) 218 dst = kbin.AppendInt16(dst, v) 219 dst = kbin.AppendInt32(dst, correlationID) 220 if k == 7 && v == 0 { 221 return dst 222 } 223 224 // Even with flexible versions, we do not use a compact client id. 225 // Clients issue ApiVersions immediately before knowing the broker 226 // version, and old brokers will not be able to understand a compact 227 // client id. 228 dst = kbin.AppendNullableString(dst, f.clientID) 229 230 // The flexible tags end the request header, and then begins the 231 // request body. 232 if r.IsFlexible() { 233 var numTags uint8 234 dst = append(dst, numTags) 235 if numTags != 0 { 236 // TODO when tags are added 237 } 238 } 239 240 // Now the request body. 241 dst = r.AppendTo(dst) 242 243 kbin.AppendInt32(dst[:0], int32(len(dst[4:]))) 244 return dst 245 } 246 247 // StringPtr is a helper to return a pointer to a string. 248 func StringPtr(in string) *string { 249 return &in 250 } 251 252 // ReadFrom provides decoding various versions of sticky member metadata. A key 253 // point of this type is that it does not contain a version number inside it, 254 // but it is versioned: if decoding v1 fails, this falls back to v0. 255 func (s *StickyMemberMetadata) ReadFrom(src []byte) error { 256 return s.readFrom(src, false) 257 } 258 259 // UnsafeReadFrom is the same as ReadFrom, but uses unsafe slice to string 260 // conversions to reduce garbage. 261 func (s *StickyMemberMetadata) UnsafeReadFrom(src []byte) error { 262 return s.readFrom(src, true) 263 } 264 265 func (s *StickyMemberMetadata) readFrom(src []byte, unsafe bool) error { 266 b := kbin.Reader{Src: src} 267 numAssignments := b.ArrayLen() 268 if numAssignments < 0 { 269 numAssignments = 0 270 } 271 need := numAssignments - int32(cap(s.CurrentAssignment)) 272 if need > 0 { 273 s.CurrentAssignment = append(s.CurrentAssignment[:cap(s.CurrentAssignment)], make([]StickyMemberMetadataCurrentAssignment, need)...) 274 } else { 275 s.CurrentAssignment = s.CurrentAssignment[:numAssignments] 276 } 277 for i := int32(0); i < numAssignments; i++ { 278 var topic string 279 if unsafe { 280 topic = b.UnsafeString() 281 } else { 282 topic = b.String() 283 } 284 numPartitions := b.ArrayLen() 285 if numPartitions < 0 { 286 numPartitions = 0 287 } 288 a := &s.CurrentAssignment[i] 289 a.Topic = topic 290 need := numPartitions - int32(cap(a.Partitions)) 291 if need > 0 { 292 a.Partitions = append(a.Partitions[:cap(a.Partitions)], make([]int32, need)...) 293 } else { 294 a.Partitions = a.Partitions[:numPartitions] 295 } 296 for i := range a.Partitions { 297 a.Partitions[i] = b.Int32() 298 } 299 } 300 if len(b.Src) > 0 { 301 s.Generation = b.Int32() 302 } else { 303 s.Generation = -1 304 } 305 return b.Complete() 306 } 307 308 // AppendTo provides appending various versions of sticky member metadata to dst. 309 // If generation is not -1 (default for v0), this appends as version 1. 310 func (s *StickyMemberMetadata) AppendTo(dst []byte) []byte { 311 dst = kbin.AppendArrayLen(dst, len(s.CurrentAssignment)) 312 for _, assignment := range s.CurrentAssignment { 313 dst = kbin.AppendString(dst, assignment.Topic) 314 dst = kbin.AppendArrayLen(dst, len(assignment.Partitions)) 315 for _, partition := range assignment.Partitions { 316 dst = kbin.AppendInt32(dst, partition) 317 } 318 } 319 if s.Generation != -1 { 320 dst = kbin.AppendInt32(dst, s.Generation) 321 } 322 return dst 323 } 324 325 // TagReader has is a type that has the ability to skip tags. 326 // 327 // This is effectively a trimmed version of the kbin.Reader, with the purpose 328 // being that kmsg cannot depend on an external package. 329 type TagReader interface { 330 // Uvarint returns a uint32. If the reader has read too much and has 331 // exhausted all bytes, this should set the reader's internal state 332 // to failed and return 0. 333 Uvarint() uint32 334 335 // Span returns n bytes from the reader. If the reader has read too 336 // much and exhausted all bytes this should set the reader's internal 337 // to failed and return nil. 338 Span(n int) []byte 339 } 340 341 // SkipTags skips tags in a TagReader. 342 func SkipTags(b TagReader) { 343 for num := b.Uvarint(); num > 0; num-- { 344 _, size := b.Uvarint(), b.Uvarint() 345 b.Span(int(size)) 346 } 347 } 348 349 // internalSkipTags skips tags in the duplicated inner kbin.Reader. 350 func internalSkipTags(b *kbin.Reader) { 351 for num := b.Uvarint(); num > 0; num-- { 352 _, size := b.Uvarint(), b.Uvarint() 353 b.Span(int(size)) 354 } 355 } 356 357 // ReadTags reads tags in a TagReader and returns the tags. 358 func ReadTags(b TagReader) Tags { 359 var t Tags 360 for num := b.Uvarint(); num > 0; num-- { 361 key, size := b.Uvarint(), b.Uvarint() 362 t.Set(key, b.Span(int(size))) 363 } 364 return t 365 } 366 367 // internalReadTags reads tags in a reader and returns the tags from a 368 // duplicated inner kbin.Reader. 369 func internalReadTags(b *kbin.Reader) Tags { 370 var t Tags 371 for num := b.Uvarint(); num > 0; num-- { 372 key, size := b.Uvarint(), b.Uvarint() 373 t.Set(key, b.Span(int(size))) 374 } 375 return t 376 } 377 378 // Tags is an opaque structure capturing unparsed tags. 379 type Tags struct { 380 keyvals map[uint32][]byte 381 } 382 383 // Len returns the number of keyvals in Tags. 384 func (t *Tags) Len() int { return len(t.keyvals) } 385 386 // Each calls fn for each key and val in the tags. 387 func (t *Tags) Each(fn func(uint32, []byte)) { 388 if len(t.keyvals) == 0 { 389 return 390 } 391 // We must encode keys in order. We expect to have limited (no) unknown 392 // keys, so for now, we take a lazy approach and allocate an ordered 393 // slice. 394 ordered := make([]uint32, 0, len(t.keyvals)) 395 for key := range t.keyvals { 396 ordered = append(ordered, key) 397 } 398 sort.Slice(ordered, func(i, j int) bool { return ordered[i] < ordered[j] }) 399 for _, key := range ordered { 400 fn(key, t.keyvals[key]) 401 } 402 } 403 404 // Set sets a tag's key and val. 405 // 406 // Note that serializing tags does NOT check if the set key overlaps with an 407 // existing used key. It is invalid to set a key used by Kafka itself. 408 func (t *Tags) Set(key uint32, val []byte) { 409 if t.keyvals == nil { 410 t.keyvals = make(map[uint32][]byte) 411 } 412 t.keyvals[key] = val 413 } 414 415 // AppendEach appends each keyval in tags to dst and returns the updated dst. 416 func (t *Tags) AppendEach(dst []byte) []byte { 417 t.Each(func(key uint32, val []byte) { 418 dst = kbin.AppendUvarint(dst, key) 419 dst = kbin.AppendUvarint(dst, uint32(len(val))) 420 dst = append(dst, val...) 421 }) 422 return dst 423 } 424