...

Source file src/github.com/twmb/franz-go/pkg/kmsg/internal/kbin/primitives.go

Documentation: github.com/twmb/franz-go/pkg/kmsg/internal/kbin

     1  // Package kbin contains Kafka primitive reading and writing functions.
     2  package kbin
     3  
     4  import (
     5  	"encoding/binary"
     6  	"errors"
     7  	"math"
     8  	"math/bits"
     9  	"reflect"
    10  	"unsafe"
    11  )
    12  
    13  // This file contains primitive type encoding and decoding.
    14  //
    15  // The Reader helper can be used even when content runs out
    16  // or an error is hit; all other number requests will return
    17  // zero so a decode will basically no-op.
    18  
    19  // ErrNotEnoughData is returned when a type could not fully decode
    20  // from a slice because the slice did not have enough data.
    21  var ErrNotEnoughData = errors.New("response did not contain enough data to be valid")
    22  
    23  // AppendBool appends 1 for true or 0 for false to dst.
    24  func AppendBool(dst []byte, v bool) []byte {
    25  	if v {
    26  		return append(dst, 1)
    27  	}
    28  	return append(dst, 0)
    29  }
    30  
    31  // AppendInt8 appends an int8 to dst.
    32  func AppendInt8(dst []byte, i int8) []byte {
    33  	return append(dst, byte(i))
    34  }
    35  
    36  // AppendInt16 appends a big endian int16 to dst.
    37  func AppendInt16(dst []byte, i int16) []byte {
    38  	return AppendUint16(dst, uint16(i))
    39  }
    40  
    41  // AppendUint16 appends a big endian uint16 to dst.
    42  func AppendUint16(dst []byte, u uint16) []byte {
    43  	return append(dst, byte(u>>8), byte(u))
    44  }
    45  
    46  // AppendInt32 appends a big endian int32 to dst.
    47  func AppendInt32(dst []byte, i int32) []byte {
    48  	return AppendUint32(dst, uint32(i))
    49  }
    50  
    51  // AppendInt64 appends a big endian int64 to dst.
    52  func AppendInt64(dst []byte, i int64) []byte {
    53  	return appendUint64(dst, uint64(i))
    54  }
    55  
    56  // AppendFloat64 appends a big endian float64 to dst.
    57  func AppendFloat64(dst []byte, f float64) []byte {
    58  	return appendUint64(dst, math.Float64bits(f))
    59  }
    60  
    61  // AppendUuid appends the 16 uuid bytes to dst.
    62  func AppendUuid(dst []byte, uuid [16]byte) []byte {
    63  	return append(dst, uuid[:]...)
    64  }
    65  
    66  func appendUint64(dst []byte, u uint64) []byte {
    67  	return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32),
    68  		byte(u>>24), byte(u>>16), byte(u>>8), byte(u))
    69  }
    70  
    71  // AppendUint32 appends a big endian uint32 to dst.
    72  func AppendUint32(dst []byte, u uint32) []byte {
    73  	return append(dst, byte(u>>24), byte(u>>16), byte(u>>8), byte(u))
    74  }
    75  
    76  // uvarintLens could only be length 65, but using 256 allows bounds check
    77  // elimination on lookup.
    78  const uvarintLens = "\x01\x01\x01\x01\x01\x01\x01\x01\x02\x02\x02\x02\x02\x02\x02\x03\x03\x03\x03\x03\x03\x03\x04\x04\x04\x04\x04\x04\x04\x05\x05\x05\x05\x05\x05\x05\x06\x06\x06\x06\x06\x06\x06\x07\x07\x07\x07\x07\x07\x07\x08\x08\x08\x08\x08\x08\x08\x09\x09\x09\x09\x09\x09\x09\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    79  
    80  // VarintLen returns how long i would be if it were varint encoded.
    81  func VarintLen(i int32) int {
    82  	u := uint32(i)<<1 ^ uint32(i>>31)
    83  	return UvarintLen(u)
    84  }
    85  
    86  // UvarintLen returns how long u would be if it were uvarint encoded.
    87  func UvarintLen(u uint32) int {
    88  	return int(uvarintLens[byte(bits.Len32(u))])
    89  }
    90  
    91  func uvarlongLen(u uint64) int {
    92  	return int(uvarintLens[byte(bits.Len64(u))])
    93  }
    94  
    95  // Varint is a loop unrolled 32 bit varint decoder. The return semantics
    96  // are the same as binary.Varint, with the added benefit that overflows
    97  // in 5 byte encodings are handled rather than left to the user.
    98  func Varint(in []byte) (int32, int) {
    99  	x, n := Uvarint(in)
   100  	return int32((x >> 1) ^ -(x & 1)), n
   101  }
   102  
   103  // Uvarint is a loop unrolled 32 bit uvarint decoder. The return semantics
   104  // are the same as binary.Uvarint, with the added benefit that overflows
   105  // in 5 byte encodings are handled rather than left to the user.
   106  func Uvarint(in []byte) (uint32, int) {
   107  	var x uint32
   108  	var overflow int
   109  
   110  	if len(in) < 1 {
   111  		goto fail
   112  	}
   113  
   114  	x = uint32(in[0] & 0x7f)
   115  	if in[0]&0x80 == 0 {
   116  		return x, 1
   117  	} else if len(in) < 2 {
   118  		goto fail
   119  	}
   120  
   121  	x |= uint32(in[1]&0x7f) << 7
   122  	if in[1]&0x80 == 0 {
   123  		return x, 2
   124  	} else if len(in) < 3 {
   125  		goto fail
   126  	}
   127  
   128  	x |= uint32(in[2]&0x7f) << 14
   129  	if in[2]&0x80 == 0 {
   130  		return x, 3
   131  	} else if len(in) < 4 {
   132  		goto fail
   133  	}
   134  
   135  	x |= uint32(in[3]&0x7f) << 21
   136  	if in[3]&0x80 == 0 {
   137  		return x, 4
   138  	} else if len(in) < 5 {
   139  		goto fail
   140  	}
   141  
   142  	x |= uint32(in[4]) << 28
   143  	if in[4] <= 0x0f {
   144  		return x, 5
   145  	}
   146  
   147  	overflow = -5
   148  
   149  fail:
   150  	return 0, overflow
   151  }
   152  
   153  // Varlong is a loop unrolled 64 bit varint decoder. The return semantics
   154  // are the same as binary.Varint, with the added benefit that overflows
   155  // in 10 byte encodings are handled rather than left to the user.
   156  func Varlong(in []byte) (int64, int) {
   157  	x, n := uvarlong(in)
   158  	return int64((x >> 1) ^ -(x & 1)), n
   159  }
   160  
   161  func uvarlong(in []byte) (uint64, int) {
   162  	var x uint64
   163  	var overflow int
   164  
   165  	if len(in) < 1 {
   166  		goto fail
   167  	}
   168  
   169  	x = uint64(in[0] & 0x7f)
   170  	if in[0]&0x80 == 0 {
   171  		return x, 1
   172  	} else if len(in) < 2 {
   173  		goto fail
   174  	}
   175  
   176  	x |= uint64(in[1]&0x7f) << 7
   177  	if in[1]&0x80 == 0 {
   178  		return x, 2
   179  	} else if len(in) < 3 {
   180  		goto fail
   181  	}
   182  
   183  	x |= uint64(in[2]&0x7f) << 14
   184  	if in[2]&0x80 == 0 {
   185  		return x, 3
   186  	} else if len(in) < 4 {
   187  		goto fail
   188  	}
   189  
   190  	x |= uint64(in[3]&0x7f) << 21
   191  	if in[3]&0x80 == 0 {
   192  		return x, 4
   193  	} else if len(in) < 5 {
   194  		goto fail
   195  	}
   196  
   197  	x |= uint64(in[4]&0x7f) << 28
   198  	if in[4]&0x80 == 0 {
   199  		return x, 5
   200  	} else if len(in) < 6 {
   201  		goto fail
   202  	}
   203  
   204  	x |= uint64(in[5]&0x7f) << 35
   205  	if in[5]&0x80 == 0 {
   206  		return x, 6
   207  	} else if len(in) < 7 {
   208  		goto fail
   209  	}
   210  
   211  	x |= uint64(in[6]&0x7f) << 42
   212  	if in[6]&0x80 == 0 {
   213  		return x, 7
   214  	} else if len(in) < 8 {
   215  		goto fail
   216  	}
   217  
   218  	x |= uint64(in[7]&0x7f) << 49
   219  	if in[7]&0x80 == 0 {
   220  		return x, 8
   221  	} else if len(in) < 9 {
   222  		goto fail
   223  	}
   224  
   225  	x |= uint64(in[8]&0x7f) << 56
   226  	if in[8]&0x80 == 0 {
   227  		return x, 9
   228  	} else if len(in) < 10 {
   229  		goto fail
   230  	}
   231  
   232  	x |= uint64(in[9]) << 63
   233  	if in[9] <= 0x01 {
   234  		return x, 10
   235  	}
   236  
   237  	overflow = -10
   238  
   239  fail:
   240  	return 0, overflow
   241  }
   242  
   243  // AppendVarint appends a varint encoded i to dst.
   244  func AppendVarint(dst []byte, i int32) []byte {
   245  	return AppendUvarint(dst, uint32(i)<<1^uint32(i>>31))
   246  }
   247  
   248  // AppendUvarint appends a uvarint encoded u to dst.
   249  func AppendUvarint(dst []byte, u uint32) []byte {
   250  	switch UvarintLen(u) {
   251  	case 5:
   252  		return append(dst,
   253  			byte(u&0x7f|0x80),
   254  			byte((u>>7)&0x7f|0x80),
   255  			byte((u>>14)&0x7f|0x80),
   256  			byte((u>>21)&0x7f|0x80),
   257  			byte(u>>28))
   258  	case 4:
   259  		return append(dst,
   260  			byte(u&0x7f|0x80),
   261  			byte((u>>7)&0x7f|0x80),
   262  			byte((u>>14)&0x7f|0x80),
   263  			byte(u>>21))
   264  	case 3:
   265  		return append(dst,
   266  			byte(u&0x7f|0x80),
   267  			byte((u>>7)&0x7f|0x80),
   268  			byte(u>>14))
   269  	case 2:
   270  		return append(dst,
   271  			byte(u&0x7f|0x80),
   272  			byte(u>>7))
   273  	case 1:
   274  		return append(dst, byte(u))
   275  	}
   276  	return dst
   277  }
   278  
   279  // AppendVarlong appends a varint encoded i to dst.
   280  func AppendVarlong(dst []byte, i int64) []byte {
   281  	return appendUvarlong(dst, uint64(i)<<1^uint64(i>>63))
   282  }
   283  
   284  func appendUvarlong(dst []byte, u uint64) []byte {
   285  	switch uvarlongLen(u) {
   286  	case 10:
   287  		return append(dst,
   288  			byte(u&0x7f|0x80),
   289  			byte((u>>7)&0x7f|0x80),
   290  			byte((u>>14)&0x7f|0x80),
   291  			byte((u>>21)&0x7f|0x80),
   292  			byte((u>>28)&0x7f|0x80),
   293  			byte((u>>35)&0x7f|0x80),
   294  			byte((u>>42)&0x7f|0x80),
   295  			byte((u>>49)&0x7f|0x80),
   296  			byte((u>>56)&0x7f|0x80),
   297  			byte(u>>63))
   298  	case 9:
   299  		return append(dst,
   300  			byte(u&0x7f|0x80),
   301  			byte((u>>7)&0x7f|0x80),
   302  			byte((u>>14)&0x7f|0x80),
   303  			byte((u>>21)&0x7f|0x80),
   304  			byte((u>>28)&0x7f|0x80),
   305  			byte((u>>35)&0x7f|0x80),
   306  			byte((u>>42)&0x7f|0x80),
   307  			byte((u>>49)&0x7f|0x80),
   308  			byte(u>>56))
   309  	case 8:
   310  		return append(dst,
   311  			byte(u&0x7f|0x80),
   312  			byte((u>>7)&0x7f|0x80),
   313  			byte((u>>14)&0x7f|0x80),
   314  			byte((u>>21)&0x7f|0x80),
   315  			byte((u>>28)&0x7f|0x80),
   316  			byte((u>>35)&0x7f|0x80),
   317  			byte((u>>42)&0x7f|0x80),
   318  			byte(u>>49))
   319  	case 7:
   320  		return append(dst,
   321  			byte(u&0x7f|0x80),
   322  			byte((u>>7)&0x7f|0x80),
   323  			byte((u>>14)&0x7f|0x80),
   324  			byte((u>>21)&0x7f|0x80),
   325  			byte((u>>28)&0x7f|0x80),
   326  			byte((u>>35)&0x7f|0x80),
   327  			byte(u>>42))
   328  	case 6:
   329  		return append(dst,
   330  			byte(u&0x7f|0x80),
   331  			byte((u>>7)&0x7f|0x80),
   332  			byte((u>>14)&0x7f|0x80),
   333  			byte((u>>21)&0x7f|0x80),
   334  			byte((u>>28)&0x7f|0x80),
   335  			byte(u>>35))
   336  	case 5:
   337  		return append(dst,
   338  			byte(u&0x7f|0x80),
   339  			byte((u>>7)&0x7f|0x80),
   340  			byte((u>>14)&0x7f|0x80),
   341  			byte((u>>21)&0x7f|0x80),
   342  			byte(u>>28))
   343  	case 4:
   344  		return append(dst,
   345  			byte(u&0x7f|0x80),
   346  			byte((u>>7)&0x7f|0x80),
   347  			byte((u>>14)&0x7f|0x80),
   348  			byte(u>>21))
   349  	case 3:
   350  		return append(dst,
   351  			byte(u&0x7f|0x80),
   352  			byte((u>>7)&0x7f|0x80),
   353  			byte(u>>14))
   354  	case 2:
   355  		return append(dst,
   356  			byte(u&0x7f|0x80),
   357  			byte(u>>7))
   358  	case 1:
   359  		return append(dst, byte(u))
   360  	}
   361  	return dst
   362  }
   363  
   364  // AppendString appends a string to dst prefixed with its int16 length.
   365  func AppendString(dst []byte, s string) []byte {
   366  	dst = AppendInt16(dst, int16(len(s)))
   367  	return append(dst, s...)
   368  }
   369  
   370  // AppendCompactString appends a string to dst prefixed with its uvarint length
   371  // starting at 1; 0 is reserved for null, which compact strings are not
   372  // (nullable compact ones are!). Thus, the length is the decoded uvarint - 1.
   373  //
   374  // For KIP-482.
   375  func AppendCompactString(dst []byte, s string) []byte {
   376  	dst = AppendUvarint(dst, 1+uint32(len(s)))
   377  	return append(dst, s...)
   378  }
   379  
   380  // AppendNullableString appends potentially nil string to dst prefixed with its
   381  // int16 length or int16(-1) if nil.
   382  func AppendNullableString(dst []byte, s *string) []byte {
   383  	if s == nil {
   384  		return AppendInt16(dst, -1)
   385  	}
   386  	return AppendString(dst, *s)
   387  }
   388  
   389  // AppendCompactNullableString appends a potentially nil string to dst with its
   390  // uvarint length starting at 1, with 0 indicating null. Thus, the length is
   391  // the decoded uvarint - 1.
   392  //
   393  // For KIP-482.
   394  func AppendCompactNullableString(dst []byte, s *string) []byte {
   395  	if s == nil {
   396  		return AppendUvarint(dst, 0)
   397  	}
   398  	return AppendCompactString(dst, *s)
   399  }
   400  
   401  // AppendBytes appends bytes to dst prefixed with its int32 length.
   402  func AppendBytes(dst, b []byte) []byte {
   403  	dst = AppendInt32(dst, int32(len(b)))
   404  	return append(dst, b...)
   405  }
   406  
   407  // AppendCompactBytes appends bytes to dst prefixed with a its uvarint length
   408  // starting at 1; 0 is reserved for null, which compact bytes are not (nullable
   409  // compact ones are!). Thus, the length is the decoded uvarint - 1.
   410  //
   411  // For KIP-482.
   412  func AppendCompactBytes(dst, b []byte) []byte {
   413  	dst = AppendUvarint(dst, 1+uint32(len(b)))
   414  	return append(dst, b...)
   415  }
   416  
   417  // AppendNullableBytes appends a potentially nil slice to dst prefixed with its
   418  // int32 length or int32(-1) if nil.
   419  func AppendNullableBytes(dst, b []byte) []byte {
   420  	if b == nil {
   421  		return AppendInt32(dst, -1)
   422  	}
   423  	return AppendBytes(dst, b)
   424  }
   425  
   426  // AppendCompactNullableBytes appends a potentially nil slice to dst with its
   427  // uvarint length starting at 1, with 0 indicating null. Thus, the length is
   428  // the decoded uvarint - 1.
   429  //
   430  // For KIP-482.
   431  func AppendCompactNullableBytes(dst, b []byte) []byte {
   432  	if b == nil {
   433  		return AppendUvarint(dst, 0)
   434  	}
   435  	return AppendCompactBytes(dst, b)
   436  }
   437  
   438  // AppendVarintString appends a string to dst prefixed with its length encoded
   439  // as a varint.
   440  func AppendVarintString(dst []byte, s string) []byte {
   441  	dst = AppendVarint(dst, int32(len(s)))
   442  	return append(dst, s...)
   443  }
   444  
   445  // AppendVarintBytes appends a slice to dst prefixed with its length encoded as
   446  // a varint.
   447  func AppendVarintBytes(dst, b []byte) []byte {
   448  	if b == nil {
   449  		return AppendVarint(dst, -1)
   450  	}
   451  	dst = AppendVarint(dst, int32(len(b)))
   452  	return append(dst, b...)
   453  }
   454  
   455  // AppendArrayLen appends the length of an array as an int32 to dst.
   456  func AppendArrayLen(dst []byte, l int) []byte {
   457  	return AppendInt32(dst, int32(l))
   458  }
   459  
   460  // AppendCompactArrayLen appends the length of an array as a uvarint to dst
   461  // as the length + 1.
   462  //
   463  // For KIP-482.
   464  func AppendCompactArrayLen(dst []byte, l int) []byte {
   465  	return AppendUvarint(dst, 1+uint32(l))
   466  }
   467  
   468  // AppendNullableArrayLen appends the length of an array as an int32 to dst,
   469  // or -1 if isNil is true.
   470  func AppendNullableArrayLen(dst []byte, l int, isNil bool) []byte {
   471  	if isNil {
   472  		return AppendInt32(dst, -1)
   473  	}
   474  	return AppendInt32(dst, int32(l))
   475  }
   476  
   477  // AppendCompactNullableArrayLen appends the length of an array as a uvarint to
   478  // dst as the length + 1; if isNil is true, this appends 0 as a uvarint.
   479  //
   480  // For KIP-482.
   481  func AppendCompactNullableArrayLen(dst []byte, l int, isNil bool) []byte {
   482  	if isNil {
   483  		return AppendUvarint(dst, 0)
   484  	}
   485  	return AppendUvarint(dst, 1+uint32(l))
   486  }
   487  
   488  // Reader is used to decode Kafka messages.
   489  //
   490  // For all functions on Reader, if the reader has been invalidated, functions
   491  // return defaults (false, 0, nil, ""). Use Complete to detect if the reader
   492  // was invalidated or if the reader has remaining data.
   493  type Reader struct {
   494  	Src []byte
   495  	bad bool
   496  }
   497  
   498  // Bool returns a bool from the reader.
   499  func (b *Reader) Bool() bool {
   500  	if len(b.Src) < 1 {
   501  		b.bad = true
   502  		b.Src = nil
   503  		return false
   504  	}
   505  	t := b.Src[0] != 0 // if '0', false
   506  	b.Src = b.Src[1:]
   507  	return t
   508  }
   509  
   510  // Int8 returns an int8 from the reader.
   511  func (b *Reader) Int8() int8 {
   512  	if len(b.Src) < 1 {
   513  		b.bad = true
   514  		b.Src = nil
   515  		return 0
   516  	}
   517  	r := b.Src[0]
   518  	b.Src = b.Src[1:]
   519  	return int8(r)
   520  }
   521  
   522  // Int16 returns an int16 from the reader.
   523  func (b *Reader) Int16() int16 {
   524  	if len(b.Src) < 2 {
   525  		b.bad = true
   526  		b.Src = nil
   527  		return 0
   528  	}
   529  	r := int16(binary.BigEndian.Uint16(b.Src))
   530  	b.Src = b.Src[2:]
   531  	return r
   532  }
   533  
   534  // Uint16 returns an uint16 from the reader.
   535  func (b *Reader) Uint16() uint16 {
   536  	if len(b.Src) < 2 {
   537  		b.bad = true
   538  		b.Src = nil
   539  		return 0
   540  	}
   541  	r := binary.BigEndian.Uint16(b.Src)
   542  	b.Src = b.Src[2:]
   543  	return r
   544  }
   545  
   546  // Int32 returns an int32 from the reader.
   547  func (b *Reader) Int32() int32 {
   548  	if len(b.Src) < 4 {
   549  		b.bad = true
   550  		b.Src = nil
   551  		return 0
   552  	}
   553  	r := int32(binary.BigEndian.Uint32(b.Src))
   554  	b.Src = b.Src[4:]
   555  	return r
   556  }
   557  
   558  // Int64 returns an int64 from the reader.
   559  func (b *Reader) Int64() int64 {
   560  	return int64(b.readUint64())
   561  }
   562  
   563  // Uuid returns a uuid from the reader.
   564  func (b *Reader) Uuid() [16]byte {
   565  	var r [16]byte
   566  	copy(r[:], b.Span(16))
   567  	return r
   568  }
   569  
   570  // Float64 returns a float64 from the reader.
   571  func (b *Reader) Float64() float64 {
   572  	return math.Float64frombits(b.readUint64())
   573  }
   574  
   575  func (b *Reader) readUint64() uint64 {
   576  	if len(b.Src) < 8 {
   577  		b.bad = true
   578  		b.Src = nil
   579  		return 0
   580  	}
   581  	r := binary.BigEndian.Uint64(b.Src)
   582  	b.Src = b.Src[8:]
   583  	return r
   584  }
   585  
   586  // Uint32 returns a uint32 from the reader.
   587  func (b *Reader) Uint32() uint32 {
   588  	if len(b.Src) < 4 {
   589  		b.bad = true
   590  		b.Src = nil
   591  		return 0
   592  	}
   593  	r := binary.BigEndian.Uint32(b.Src)
   594  	b.Src = b.Src[4:]
   595  	return r
   596  }
   597  
   598  // Varint returns a varint int32 from the reader.
   599  func (b *Reader) Varint() int32 {
   600  	val, n := Varint(b.Src)
   601  	if n <= 0 {
   602  		b.bad = true
   603  		b.Src = nil
   604  		return 0
   605  	}
   606  	b.Src = b.Src[n:]
   607  	return val
   608  }
   609  
   610  // Varlong returns a varlong int64 from the reader.
   611  func (b *Reader) Varlong() int64 {
   612  	val, n := Varlong(b.Src)
   613  	if n <= 0 {
   614  		b.bad = true
   615  		b.Src = nil
   616  		return 0
   617  	}
   618  	b.Src = b.Src[n:]
   619  	return val
   620  }
   621  
   622  // Uvarint returns a uvarint encoded uint32 from the reader.
   623  func (b *Reader) Uvarint() uint32 {
   624  	val, n := Uvarint(b.Src)
   625  	if n <= 0 {
   626  		b.bad = true
   627  		b.Src = nil
   628  		return 0
   629  	}
   630  	b.Src = b.Src[n:]
   631  	return val
   632  }
   633  
   634  // Span returns l bytes from the reader.
   635  func (b *Reader) Span(l int) []byte {
   636  	if len(b.Src) < l || l < 0 {
   637  		b.bad = true
   638  		b.Src = nil
   639  		return nil
   640  	}
   641  	r := b.Src[:l:l]
   642  	b.Src = b.Src[l:]
   643  	return r
   644  }
   645  
   646  // UnsafeString returns a Kafka string from the reader without allocating using
   647  // the unsafe package. This must be used with care; note the string holds a
   648  // reference to the original slice.
   649  func (b *Reader) UnsafeString() string {
   650  	l := b.Int16()
   651  	return UnsafeString(b.Span(int(l)))
   652  }
   653  
   654  // String returns a Kafka string from the reader.
   655  func (b *Reader) String() string {
   656  	l := b.Int16()
   657  	return string(b.Span(int(l)))
   658  }
   659  
   660  // UnsafeCompactString returns a Kafka compact string from the reader without
   661  // allocating using the unsafe package. This must be used with care; note the
   662  // string holds a reference to the original slice.
   663  func (b *Reader) UnsafeCompactString() string {
   664  	l := int(b.Uvarint()) - 1
   665  	return UnsafeString(b.Span(l))
   666  }
   667  
   668  // CompactString returns a Kafka compact string from the reader.
   669  func (b *Reader) CompactString() string {
   670  	l := int(b.Uvarint()) - 1
   671  	return string(b.Span(l))
   672  }
   673  
   674  // UnsafeNullableString returns a Kafka nullable string from the reader without
   675  // allocating using the unsafe package. This must be used with care; note the
   676  // string holds a reference to the original slice.
   677  func (b *Reader) UnsafeNullableString() *string {
   678  	l := b.Int16()
   679  	if l < 0 {
   680  		return nil
   681  	}
   682  	s := UnsafeString(b.Span(int(l)))
   683  	return &s
   684  }
   685  
   686  // NullableString returns a Kafka nullable string from the reader.
   687  func (b *Reader) NullableString() *string {
   688  	l := b.Int16()
   689  	if l < 0 {
   690  		return nil
   691  	}
   692  	s := string(b.Span(int(l)))
   693  	return &s
   694  }
   695  
   696  // UnsafeCompactNullableString returns a Kafka compact nullable string from the
   697  // reader without allocating using the unsafe package. This must be used with
   698  // care; note the string holds a reference to the original slice.
   699  func (b *Reader) UnsafeCompactNullableString() *string {
   700  	l := int(b.Uvarint()) - 1
   701  	if l < 0 {
   702  		return nil
   703  	}
   704  	s := UnsafeString(b.Span(l))
   705  	return &s
   706  }
   707  
   708  // CompactNullableString returns a Kafka compact nullable string from the
   709  // reader.
   710  func (b *Reader) CompactNullableString() *string {
   711  	l := int(b.Uvarint()) - 1
   712  	if l < 0 {
   713  		return nil
   714  	}
   715  	s := string(b.Span(l))
   716  	return &s
   717  }
   718  
   719  // Bytes returns a Kafka byte array from the reader.
   720  //
   721  // This never returns nil.
   722  func (b *Reader) Bytes() []byte {
   723  	l := b.Int32()
   724  	// This is not to spec, but it is not clearly documented and Microsoft
   725  	// EventHubs fails here. -1 means null, which should throw an
   726  	// exception. EventHubs uses -1 to mean "does not exist" on some
   727  	// non-nullable fields.
   728  	//
   729  	// Until EventHubs is fixed, we return an empty byte slice for null.
   730  	if l == -1 {
   731  		return []byte{}
   732  	}
   733  	return b.Span(int(l))
   734  }
   735  
   736  // CompactBytes returns a Kafka compact byte array from the reader.
   737  //
   738  // This never returns nil.
   739  func (b *Reader) CompactBytes() []byte {
   740  	l := int(b.Uvarint()) - 1
   741  	if l == -1 { // same as above: -1 should not be allowed here
   742  		return []byte{}
   743  	}
   744  	return b.Span(l)
   745  }
   746  
   747  // NullableBytes returns a Kafka nullable byte array from the reader, returning
   748  // nil as appropriate.
   749  func (b *Reader) NullableBytes() []byte {
   750  	l := b.Int32()
   751  	if l < 0 {
   752  		return nil
   753  	}
   754  	r := b.Span(int(l))
   755  	return r
   756  }
   757  
   758  // CompactNullableBytes returns a Kafka compact nullable byte array from the
   759  // reader, returning nil as appropriate.
   760  func (b *Reader) CompactNullableBytes() []byte {
   761  	l := int(b.Uvarint()) - 1
   762  	if l < 0 {
   763  		return nil
   764  	}
   765  	r := b.Span(l)
   766  	return r
   767  }
   768  
   769  // ArrayLen returns a Kafka array length from the reader.
   770  func (b *Reader) ArrayLen() int32 {
   771  	r := b.Int32()
   772  	// The min size of a Kafka type is a byte, so if we do not have
   773  	// at least the array length of bytes left, it is bad.
   774  	if len(b.Src) < int(r) {
   775  		b.bad = true
   776  		b.Src = nil
   777  		return 0
   778  	}
   779  	return r
   780  }
   781  
   782  // VarintArrayLen returns a Kafka array length from the reader.
   783  func (b *Reader) VarintArrayLen() int32 {
   784  	r := b.Varint()
   785  	// The min size of a Kafka type is a byte, so if we do not have
   786  	// at least the array length of bytes left, it is bad.
   787  	if len(b.Src) < int(r) {
   788  		b.bad = true
   789  		b.Src = nil
   790  		return 0
   791  	}
   792  	return r
   793  }
   794  
   795  // CompactArrayLen returns a Kafka compact array length from the reader.
   796  func (b *Reader) CompactArrayLen() int32 {
   797  	r := int32(b.Uvarint()) - 1
   798  	// The min size of a Kafka type is a byte, so if we do not have
   799  	// at least the array length of bytes left, it is bad.
   800  	if len(b.Src) < int(r) {
   801  		b.bad = true
   802  		b.Src = nil
   803  		return 0
   804  	}
   805  	return r
   806  }
   807  
   808  // VarintBytes returns a Kafka encoded varint array from the reader, returning
   809  // nil as appropriate.
   810  func (b *Reader) VarintBytes() []byte {
   811  	l := b.Varint()
   812  	if l < 0 {
   813  		return nil
   814  	}
   815  	return b.Span(int(l))
   816  }
   817  
   818  // UnsafeVarintString returns a Kafka encoded varint string from the reader
   819  // without allocating using the unsafe package. This must be used with care;
   820  // note the string holds a reference to the original slice.
   821  func (b *Reader) UnsafeVarintString() string {
   822  	return UnsafeString(b.VarintBytes())
   823  }
   824  
   825  // VarintString returns a Kafka encoded varint string from the reader.
   826  func (b *Reader) VarintString() string {
   827  	return string(b.VarintBytes())
   828  }
   829  
   830  // Complete returns ErrNotEnoughData if the source ran out while decoding.
   831  func (b *Reader) Complete() error {
   832  	if b.bad {
   833  		return ErrNotEnoughData
   834  	}
   835  	return nil
   836  }
   837  
   838  // Ok returns true if the reader is still ok.
   839  func (b *Reader) Ok() bool {
   840  	return !b.bad
   841  }
   842  
   843  // UnsafeString returns the slice as a string using unsafe rule (6).
   844  func UnsafeString(slice []byte) string {
   845  	var str string
   846  	strhdr := (*reflect.StringHeader)(unsafe.Pointer(&str))             //nolint:gosec // known way to convert slice to string
   847  	strhdr.Data = ((*reflect.SliceHeader)(unsafe.Pointer(&slice))).Data //nolint:gosec // known way to convert slice to string
   848  	strhdr.Len = len(slice)
   849  	return str
   850  }
   851  

View as plain text