...

Source file src/github.com/twmb/franz-go/pkg/kbin/primitives.go

Documentation: github.com/twmb/franz-go/pkg/kbin

     1  // Package kbin contains Kafka primitive reading and writing functions.
     2  package kbin
     3  
     4  import (
     5  	"encoding/binary"
     6  	"errors"
     7  	"math"
     8  	"math/bits"
     9  	"reflect"
    10  	"unsafe"
    11  )
    12  
    13  // This file contains primitive type encoding and decoding.
    14  //
    15  // The Reader helper can be used even when content runs out
    16  // or an error is hit; all other number requests will return
    17  // zero so a decode will basically no-op.
    18  
    19  // ErrNotEnoughData is returned when a type could not fully decode
    20  // from a slice because the slice did not have enough data.
    21  var ErrNotEnoughData = errors.New("response did not contain enough data to be valid")
    22  
    23  // AppendBool appends 1 for true or 0 for false to dst.
    24  func AppendBool(dst []byte, v bool) []byte {
    25  	if v {
    26  		return append(dst, 1)
    27  	}
    28  	return append(dst, 0)
    29  }
    30  
    31  // AppendInt8 appends an int8 to dst.
    32  func AppendInt8(dst []byte, i int8) []byte {
    33  	return append(dst, byte(i))
    34  }
    35  
    36  // AppendInt16 appends a big endian int16 to dst.
    37  func AppendInt16(dst []byte, i int16) []byte {
    38  	return AppendUint16(dst, uint16(i))
    39  }
    40  
    41  // AppendUint16 appends a big endian uint16 to dst.
    42  func AppendUint16(dst []byte, u uint16) []byte {
    43  	return append(dst, byte(u>>8), byte(u))
    44  }
    45  
    46  // AppendInt32 appends a big endian int32 to dst.
    47  func AppendInt32(dst []byte, i int32) []byte {
    48  	return AppendUint32(dst, uint32(i))
    49  }
    50  
    51  // AppendInt64 appends a big endian int64 to dst.
    52  func AppendInt64(dst []byte, i int64) []byte {
    53  	return appendUint64(dst, uint64(i))
    54  }
    55  
    56  // AppendFloat64 appends a big endian float64 to dst.
    57  func AppendFloat64(dst []byte, f float64) []byte {
    58  	return appendUint64(dst, math.Float64bits(f))
    59  }
    60  
    61  // AppendUuid appends the 16 uuid bytes to dst.
    62  func AppendUuid(dst []byte, uuid [16]byte) []byte {
    63  	return append(dst, uuid[:]...)
    64  }
    65  
    66  func appendUint64(dst []byte, u uint64) []byte {
    67  	return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32),
    68  		byte(u>>24), byte(u>>16), byte(u>>8), byte(u))
    69  }
    70  
    71  // AppendUint32 appends a big endian uint32 to dst.
    72  func AppendUint32(dst []byte, u uint32) []byte {
    73  	return append(dst, byte(u>>24), byte(u>>16), byte(u>>8), byte(u))
    74  }
    75  
    76  // uvarintLens could only be length 65, but using 256 allows bounds check
    77  // elimination on lookup.
    78  const uvarintLens = "\x01\x01\x01\x01\x01\x01\x01\x01\x02\x02\x02\x02\x02\x02\x02\x03\x03\x03\x03\x03\x03\x03\x04\x04\x04\x04\x04\x04\x04\x05\x05\x05\x05\x05\x05\x05\x06\x06\x06\x06\x06\x06\x06\x07\x07\x07\x07\x07\x07\x07\x08\x08\x08\x08\x08\x08\x08\x09\x09\x09\x09\x09\x09\x09\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    79  
    80  // VarintLen returns how long i would be if it were varint encoded.
    81  func VarintLen(i int32) int {
    82  	u := uint32(i)<<1 ^ uint32(i>>31)
    83  	return UvarintLen(u)
    84  }
    85  
    86  // UvarintLen returns how long u would be if it were uvarint encoded.
    87  func UvarintLen(u uint32) int {
    88  	return int(uvarintLens[byte(bits.Len32(u))])
    89  }
    90  
    91  // VarlongLen returns how long i would be if it were varlong encoded.
    92  func VarlongLen(i int64) int {
    93  	u := uint64(i)<<1 ^ uint64(i>>63)
    94  	return uvarlongLen(u)
    95  }
    96  
    97  func uvarlongLen(u uint64) int {
    98  	return int(uvarintLens[byte(bits.Len64(u))])
    99  }
   100  
   101  // Varint is a loop unrolled 32 bit varint decoder. The return semantics
   102  // are the same as binary.Varint, with the added benefit that overflows
   103  // in 5 byte encodings are handled rather than left to the user.
   104  func Varint(in []byte) (int32, int) {
   105  	x, n := Uvarint(in)
   106  	return int32((x >> 1) ^ -(x & 1)), n
   107  }
   108  
   109  // Uvarint is a loop unrolled 32 bit uvarint decoder. The return semantics
   110  // are the same as binary.Uvarint, with the added benefit that overflows
   111  // in 5 byte encodings are handled rather than left to the user.
   112  func Uvarint(in []byte) (uint32, int) {
   113  	var x uint32
   114  	var overflow int
   115  
   116  	if len(in) < 1 {
   117  		goto fail
   118  	}
   119  
   120  	x = uint32(in[0] & 0x7f)
   121  	if in[0]&0x80 == 0 {
   122  		return x, 1
   123  	} else if len(in) < 2 {
   124  		goto fail
   125  	}
   126  
   127  	x |= uint32(in[1]&0x7f) << 7
   128  	if in[1]&0x80 == 0 {
   129  		return x, 2
   130  	} else if len(in) < 3 {
   131  		goto fail
   132  	}
   133  
   134  	x |= uint32(in[2]&0x7f) << 14
   135  	if in[2]&0x80 == 0 {
   136  		return x, 3
   137  	} else if len(in) < 4 {
   138  		goto fail
   139  	}
   140  
   141  	x |= uint32(in[3]&0x7f) << 21
   142  	if in[3]&0x80 == 0 {
   143  		return x, 4
   144  	} else if len(in) < 5 {
   145  		goto fail
   146  	}
   147  
   148  	x |= uint32(in[4]) << 28
   149  	if in[4] <= 0x0f {
   150  		return x, 5
   151  	}
   152  
   153  	overflow = -5
   154  
   155  fail:
   156  	return 0, overflow
   157  }
   158  
   159  // Varlong is a loop unrolled 64 bit varint decoder. The return semantics
   160  // are the same as binary.Varint, with the added benefit that overflows
   161  // in 10 byte encodings are handled rather than left to the user.
   162  func Varlong(in []byte) (int64, int) {
   163  	x, n := uvarlong(in)
   164  	return int64((x >> 1) ^ -(x & 1)), n
   165  }
   166  
   167  func uvarlong(in []byte) (uint64, int) {
   168  	var x uint64
   169  	var overflow int
   170  
   171  	if len(in) < 1 {
   172  		goto fail
   173  	}
   174  
   175  	x = uint64(in[0] & 0x7f)
   176  	if in[0]&0x80 == 0 {
   177  		return x, 1
   178  	} else if len(in) < 2 {
   179  		goto fail
   180  	}
   181  
   182  	x |= uint64(in[1]&0x7f) << 7
   183  	if in[1]&0x80 == 0 {
   184  		return x, 2
   185  	} else if len(in) < 3 {
   186  		goto fail
   187  	}
   188  
   189  	x |= uint64(in[2]&0x7f) << 14
   190  	if in[2]&0x80 == 0 {
   191  		return x, 3
   192  	} else if len(in) < 4 {
   193  		goto fail
   194  	}
   195  
   196  	x |= uint64(in[3]&0x7f) << 21
   197  	if in[3]&0x80 == 0 {
   198  		return x, 4
   199  	} else if len(in) < 5 {
   200  		goto fail
   201  	}
   202  
   203  	x |= uint64(in[4]&0x7f) << 28
   204  	if in[4]&0x80 == 0 {
   205  		return x, 5
   206  	} else if len(in) < 6 {
   207  		goto fail
   208  	}
   209  
   210  	x |= uint64(in[5]&0x7f) << 35
   211  	if in[5]&0x80 == 0 {
   212  		return x, 6
   213  	} else if len(in) < 7 {
   214  		goto fail
   215  	}
   216  
   217  	x |= uint64(in[6]&0x7f) << 42
   218  	if in[6]&0x80 == 0 {
   219  		return x, 7
   220  	} else if len(in) < 8 {
   221  		goto fail
   222  	}
   223  
   224  	x |= uint64(in[7]&0x7f) << 49
   225  	if in[7]&0x80 == 0 {
   226  		return x, 8
   227  	} else if len(in) < 9 {
   228  		goto fail
   229  	}
   230  
   231  	x |= uint64(in[8]&0x7f) << 56
   232  	if in[8]&0x80 == 0 {
   233  		return x, 9
   234  	} else if len(in) < 10 {
   235  		goto fail
   236  	}
   237  
   238  	x |= uint64(in[9]) << 63
   239  	if in[9] <= 0x01 {
   240  		return x, 10
   241  	}
   242  
   243  	overflow = -10
   244  
   245  fail:
   246  	return 0, overflow
   247  }
   248  
   249  // AppendVarint appends a varint encoded i to dst.
   250  func AppendVarint(dst []byte, i int32) []byte {
   251  	return AppendUvarint(dst, uint32(i)<<1^uint32(i>>31))
   252  }
   253  
   254  // AppendUvarint appends a uvarint encoded u to dst.
   255  func AppendUvarint(dst []byte, u uint32) []byte {
   256  	switch UvarintLen(u) {
   257  	case 5:
   258  		return append(dst,
   259  			byte(u&0x7f|0x80),
   260  			byte((u>>7)&0x7f|0x80),
   261  			byte((u>>14)&0x7f|0x80),
   262  			byte((u>>21)&0x7f|0x80),
   263  			byte(u>>28))
   264  	case 4:
   265  		return append(dst,
   266  			byte(u&0x7f|0x80),
   267  			byte((u>>7)&0x7f|0x80),
   268  			byte((u>>14)&0x7f|0x80),
   269  			byte(u>>21))
   270  	case 3:
   271  		return append(dst,
   272  			byte(u&0x7f|0x80),
   273  			byte((u>>7)&0x7f|0x80),
   274  			byte(u>>14))
   275  	case 2:
   276  		return append(dst,
   277  			byte(u&0x7f|0x80),
   278  			byte(u>>7))
   279  	case 1:
   280  		return append(dst, byte(u))
   281  	}
   282  	return dst
   283  }
   284  
   285  // AppendVarlong appends a varint encoded i to dst.
   286  func AppendVarlong(dst []byte, i int64) []byte {
   287  	return appendUvarlong(dst, uint64(i)<<1^uint64(i>>63))
   288  }
   289  
   290  func appendUvarlong(dst []byte, u uint64) []byte {
   291  	switch uvarlongLen(u) {
   292  	case 10:
   293  		return append(dst,
   294  			byte(u&0x7f|0x80),
   295  			byte((u>>7)&0x7f|0x80),
   296  			byte((u>>14)&0x7f|0x80),
   297  			byte((u>>21)&0x7f|0x80),
   298  			byte((u>>28)&0x7f|0x80),
   299  			byte((u>>35)&0x7f|0x80),
   300  			byte((u>>42)&0x7f|0x80),
   301  			byte((u>>49)&0x7f|0x80),
   302  			byte((u>>56)&0x7f|0x80),
   303  			byte(u>>63))
   304  	case 9:
   305  		return append(dst,
   306  			byte(u&0x7f|0x80),
   307  			byte((u>>7)&0x7f|0x80),
   308  			byte((u>>14)&0x7f|0x80),
   309  			byte((u>>21)&0x7f|0x80),
   310  			byte((u>>28)&0x7f|0x80),
   311  			byte((u>>35)&0x7f|0x80),
   312  			byte((u>>42)&0x7f|0x80),
   313  			byte((u>>49)&0x7f|0x80),
   314  			byte(u>>56))
   315  	case 8:
   316  		return append(dst,
   317  			byte(u&0x7f|0x80),
   318  			byte((u>>7)&0x7f|0x80),
   319  			byte((u>>14)&0x7f|0x80),
   320  			byte((u>>21)&0x7f|0x80),
   321  			byte((u>>28)&0x7f|0x80),
   322  			byte((u>>35)&0x7f|0x80),
   323  			byte((u>>42)&0x7f|0x80),
   324  			byte(u>>49))
   325  	case 7:
   326  		return append(dst,
   327  			byte(u&0x7f|0x80),
   328  			byte((u>>7)&0x7f|0x80),
   329  			byte((u>>14)&0x7f|0x80),
   330  			byte((u>>21)&0x7f|0x80),
   331  			byte((u>>28)&0x7f|0x80),
   332  			byte((u>>35)&0x7f|0x80),
   333  			byte(u>>42))
   334  	case 6:
   335  		return append(dst,
   336  			byte(u&0x7f|0x80),
   337  			byte((u>>7)&0x7f|0x80),
   338  			byte((u>>14)&0x7f|0x80),
   339  			byte((u>>21)&0x7f|0x80),
   340  			byte((u>>28)&0x7f|0x80),
   341  			byte(u>>35))
   342  	case 5:
   343  		return append(dst,
   344  			byte(u&0x7f|0x80),
   345  			byte((u>>7)&0x7f|0x80),
   346  			byte((u>>14)&0x7f|0x80),
   347  			byte((u>>21)&0x7f|0x80),
   348  			byte(u>>28))
   349  	case 4:
   350  		return append(dst,
   351  			byte(u&0x7f|0x80),
   352  			byte((u>>7)&0x7f|0x80),
   353  			byte((u>>14)&0x7f|0x80),
   354  			byte(u>>21))
   355  	case 3:
   356  		return append(dst,
   357  			byte(u&0x7f|0x80),
   358  			byte((u>>7)&0x7f|0x80),
   359  			byte(u>>14))
   360  	case 2:
   361  		return append(dst,
   362  			byte(u&0x7f|0x80),
   363  			byte(u>>7))
   364  	case 1:
   365  		return append(dst, byte(u))
   366  	}
   367  	return dst
   368  }
   369  
   370  // AppendString appends a string to dst prefixed with its int16 length.
   371  func AppendString(dst []byte, s string) []byte {
   372  	dst = AppendInt16(dst, int16(len(s)))
   373  	return append(dst, s...)
   374  }
   375  
   376  // AppendCompactString appends a string to dst prefixed with its uvarint length
   377  // starting at 1; 0 is reserved for null, which compact strings are not
   378  // (nullable compact ones are!). Thus, the length is the decoded uvarint - 1.
   379  //
   380  // For KIP-482.
   381  func AppendCompactString(dst []byte, s string) []byte {
   382  	dst = AppendUvarint(dst, 1+uint32(len(s)))
   383  	return append(dst, s...)
   384  }
   385  
   386  // AppendNullableString appends potentially nil string to dst prefixed with its
   387  // int16 length or int16(-1) if nil.
   388  func AppendNullableString(dst []byte, s *string) []byte {
   389  	if s == nil {
   390  		return AppendInt16(dst, -1)
   391  	}
   392  	return AppendString(dst, *s)
   393  }
   394  
   395  // AppendCompactNullableString appends a potentially nil string to dst with its
   396  // uvarint length starting at 1, with 0 indicating null. Thus, the length is
   397  // the decoded uvarint - 1.
   398  //
   399  // For KIP-482.
   400  func AppendCompactNullableString(dst []byte, s *string) []byte {
   401  	if s == nil {
   402  		return AppendUvarint(dst, 0)
   403  	}
   404  	return AppendCompactString(dst, *s)
   405  }
   406  
   407  // AppendBytes appends bytes to dst prefixed with its int32 length.
   408  func AppendBytes(dst, b []byte) []byte {
   409  	dst = AppendInt32(dst, int32(len(b)))
   410  	return append(dst, b...)
   411  }
   412  
   413  // AppendCompactBytes appends bytes to dst prefixed with a its uvarint length
   414  // starting at 1; 0 is reserved for null, which compact bytes are not (nullable
   415  // compact ones are!). Thus, the length is the decoded uvarint - 1.
   416  //
   417  // For KIP-482.
   418  func AppendCompactBytes(dst, b []byte) []byte {
   419  	dst = AppendUvarint(dst, 1+uint32(len(b)))
   420  	return append(dst, b...)
   421  }
   422  
   423  // AppendNullableBytes appends a potentially nil slice to dst prefixed with its
   424  // int32 length or int32(-1) if nil.
   425  func AppendNullableBytes(dst, b []byte) []byte {
   426  	if b == nil {
   427  		return AppendInt32(dst, -1)
   428  	}
   429  	return AppendBytes(dst, b)
   430  }
   431  
   432  // AppendCompactNullableBytes appends a potentially nil slice to dst with its
   433  // uvarint length starting at 1, with 0 indicating null. Thus, the length is
   434  // the decoded uvarint - 1.
   435  //
   436  // For KIP-482.
   437  func AppendCompactNullableBytes(dst, b []byte) []byte {
   438  	if b == nil {
   439  		return AppendUvarint(dst, 0)
   440  	}
   441  	return AppendCompactBytes(dst, b)
   442  }
   443  
   444  // AppendVarintString appends a string to dst prefixed with its length encoded
   445  // as a varint.
   446  func AppendVarintString(dst []byte, s string) []byte {
   447  	dst = AppendVarint(dst, int32(len(s)))
   448  	return append(dst, s...)
   449  }
   450  
   451  // AppendVarintBytes appends a slice to dst prefixed with its length encoded as
   452  // a varint.
   453  func AppendVarintBytes(dst, b []byte) []byte {
   454  	if b == nil {
   455  		return AppendVarint(dst, -1)
   456  	}
   457  	dst = AppendVarint(dst, int32(len(b)))
   458  	return append(dst, b...)
   459  }
   460  
   461  // AppendArrayLen appends the length of an array as an int32 to dst.
   462  func AppendArrayLen(dst []byte, l int) []byte {
   463  	return AppendInt32(dst, int32(l))
   464  }
   465  
   466  // AppendCompactArrayLen appends the length of an array as a uvarint to dst
   467  // as the length + 1.
   468  //
   469  // For KIP-482.
   470  func AppendCompactArrayLen(dst []byte, l int) []byte {
   471  	return AppendUvarint(dst, 1+uint32(l))
   472  }
   473  
   474  // AppendNullableArrayLen appends the length of an array as an int32 to dst,
   475  // or -1 if isNil is true.
   476  func AppendNullableArrayLen(dst []byte, l int, isNil bool) []byte {
   477  	if isNil {
   478  		return AppendInt32(dst, -1)
   479  	}
   480  	return AppendInt32(dst, int32(l))
   481  }
   482  
   483  // AppendCompactNullableArrayLen appends the length of an array as a uvarint to
   484  // dst as the length + 1; if isNil is true, this appends 0 as a uvarint.
   485  //
   486  // For KIP-482.
   487  func AppendCompactNullableArrayLen(dst []byte, l int, isNil bool) []byte {
   488  	if isNil {
   489  		return AppendUvarint(dst, 0)
   490  	}
   491  	return AppendUvarint(dst, 1+uint32(l))
   492  }
   493  
   494  // Reader is used to decode Kafka messages.
   495  //
   496  // For all functions on Reader, if the reader has been invalidated, functions
   497  // return defaults (false, 0, nil, ""). Use Complete to detect if the reader
   498  // was invalidated or if the reader has remaining data.
   499  type Reader struct {
   500  	Src []byte
   501  	bad bool
   502  }
   503  
   504  // Bool returns a bool from the reader.
   505  func (b *Reader) Bool() bool {
   506  	if len(b.Src) < 1 {
   507  		b.bad = true
   508  		b.Src = nil
   509  		return false
   510  	}
   511  	t := b.Src[0] != 0 // if '0', false
   512  	b.Src = b.Src[1:]
   513  	return t
   514  }
   515  
   516  // Int8 returns an int8 from the reader.
   517  func (b *Reader) Int8() int8 {
   518  	if len(b.Src) < 1 {
   519  		b.bad = true
   520  		b.Src = nil
   521  		return 0
   522  	}
   523  	r := b.Src[0]
   524  	b.Src = b.Src[1:]
   525  	return int8(r)
   526  }
   527  
   528  // Int16 returns an int16 from the reader.
   529  func (b *Reader) Int16() int16 {
   530  	if len(b.Src) < 2 {
   531  		b.bad = true
   532  		b.Src = nil
   533  		return 0
   534  	}
   535  	r := int16(binary.BigEndian.Uint16(b.Src))
   536  	b.Src = b.Src[2:]
   537  	return r
   538  }
   539  
   540  // Uint16 returns an uint16 from the reader.
   541  func (b *Reader) Uint16() uint16 {
   542  	if len(b.Src) < 2 {
   543  		b.bad = true
   544  		b.Src = nil
   545  		return 0
   546  	}
   547  	r := binary.BigEndian.Uint16(b.Src)
   548  	b.Src = b.Src[2:]
   549  	return r
   550  }
   551  
   552  // Int32 returns an int32 from the reader.
   553  func (b *Reader) Int32() int32 {
   554  	if len(b.Src) < 4 {
   555  		b.bad = true
   556  		b.Src = nil
   557  		return 0
   558  	}
   559  	r := int32(binary.BigEndian.Uint32(b.Src))
   560  	b.Src = b.Src[4:]
   561  	return r
   562  }
   563  
   564  // Int64 returns an int64 from the reader.
   565  func (b *Reader) Int64() int64 {
   566  	return int64(b.readUint64())
   567  }
   568  
   569  // Uuid returns a uuid from the reader.
   570  func (b *Reader) Uuid() [16]byte {
   571  	var r [16]byte
   572  	copy(r[:], b.Span(16))
   573  	return r
   574  }
   575  
   576  // Float64 returns a float64 from the reader.
   577  func (b *Reader) Float64() float64 {
   578  	return math.Float64frombits(b.readUint64())
   579  }
   580  
   581  func (b *Reader) readUint64() uint64 {
   582  	if len(b.Src) < 8 {
   583  		b.bad = true
   584  		b.Src = nil
   585  		return 0
   586  	}
   587  	r := binary.BigEndian.Uint64(b.Src)
   588  	b.Src = b.Src[8:]
   589  	return r
   590  }
   591  
   592  // Uint32 returns a uint32 from the reader.
   593  func (b *Reader) Uint32() uint32 {
   594  	if len(b.Src) < 4 {
   595  		b.bad = true
   596  		b.Src = nil
   597  		return 0
   598  	}
   599  	r := binary.BigEndian.Uint32(b.Src)
   600  	b.Src = b.Src[4:]
   601  	return r
   602  }
   603  
   604  // Varint returns a varint int32 from the reader.
   605  func (b *Reader) Varint() int32 {
   606  	val, n := Varint(b.Src)
   607  	if n <= 0 {
   608  		b.bad = true
   609  		b.Src = nil
   610  		return 0
   611  	}
   612  	b.Src = b.Src[n:]
   613  	return val
   614  }
   615  
   616  // Varlong returns a varlong int64 from the reader.
   617  func (b *Reader) Varlong() int64 {
   618  	val, n := Varlong(b.Src)
   619  	if n <= 0 {
   620  		b.bad = true
   621  		b.Src = nil
   622  		return 0
   623  	}
   624  	b.Src = b.Src[n:]
   625  	return val
   626  }
   627  
   628  // Uvarint returns a uvarint encoded uint32 from the reader.
   629  func (b *Reader) Uvarint() uint32 {
   630  	val, n := Uvarint(b.Src)
   631  	if n <= 0 {
   632  		b.bad = true
   633  		b.Src = nil
   634  		return 0
   635  	}
   636  	b.Src = b.Src[n:]
   637  	return val
   638  }
   639  
   640  // Span returns l bytes from the reader.
   641  func (b *Reader) Span(l int) []byte {
   642  	if len(b.Src) < l || l < 0 {
   643  		b.bad = true
   644  		b.Src = nil
   645  		return nil
   646  	}
   647  	r := b.Src[:l:l]
   648  	b.Src = b.Src[l:]
   649  	return r
   650  }
   651  
   652  // UnsafeString returns a Kafka string from the reader without allocating using
   653  // the unsafe package. This must be used with care; note the string holds a
   654  // reference to the original slice.
   655  func (b *Reader) UnsafeString() string {
   656  	l := b.Int16()
   657  	return UnsafeString(b.Span(int(l)))
   658  }
   659  
   660  // String returns a Kafka string from the reader.
   661  func (b *Reader) String() string {
   662  	l := b.Int16()
   663  	return string(b.Span(int(l)))
   664  }
   665  
   666  // UnsafeCompactString returns a Kafka compact string from the reader without
   667  // allocating using the unsafe package. This must be used with care; note the
   668  // string holds a reference to the original slice.
   669  func (b *Reader) UnsafeCompactString() string {
   670  	l := int(b.Uvarint()) - 1
   671  	return UnsafeString(b.Span(l))
   672  }
   673  
   674  // CompactString returns a Kafka compact string from the reader.
   675  func (b *Reader) CompactString() string {
   676  	l := int(b.Uvarint()) - 1
   677  	return string(b.Span(l))
   678  }
   679  
   680  // UnsafeNullableString returns a Kafka nullable string from the reader without
   681  // allocating using the unsafe package. This must be used with care; note the
   682  // string holds a reference to the original slice.
   683  func (b *Reader) UnsafeNullableString() *string {
   684  	l := b.Int16()
   685  	if l < 0 {
   686  		return nil
   687  	}
   688  	s := UnsafeString(b.Span(int(l)))
   689  	return &s
   690  }
   691  
   692  // NullableString returns a Kafka nullable string from the reader.
   693  func (b *Reader) NullableString() *string {
   694  	l := b.Int16()
   695  	if l < 0 {
   696  		return nil
   697  	}
   698  	s := string(b.Span(int(l)))
   699  	return &s
   700  }
   701  
   702  // UnsafeCompactNullableString returns a Kafka compact nullable string from the
   703  // reader without allocating using the unsafe package. This must be used with
   704  // care; note the string holds a reference to the original slice.
   705  func (b *Reader) UnsafeCompactNullableString() *string {
   706  	l := int(b.Uvarint()) - 1
   707  	if l < 0 {
   708  		return nil
   709  	}
   710  	s := UnsafeString(b.Span(l))
   711  	return &s
   712  }
   713  
   714  // CompactNullableString returns a Kafka compact nullable string from the
   715  // reader.
   716  func (b *Reader) CompactNullableString() *string {
   717  	l := int(b.Uvarint()) - 1
   718  	if l < 0 {
   719  		return nil
   720  	}
   721  	s := string(b.Span(l))
   722  	return &s
   723  }
   724  
   725  // Bytes returns a Kafka byte array from the reader.
   726  //
   727  // This never returns nil.
   728  func (b *Reader) Bytes() []byte {
   729  	l := b.Int32()
   730  	// This is not to spec, but it is not clearly documented and Microsoft
   731  	// EventHubs fails here. -1 means null, which should throw an
   732  	// exception. EventHubs uses -1 to mean "does not exist" on some
   733  	// non-nullable fields.
   734  	//
   735  	// Until EventHubs is fixed, we return an empty byte slice for null.
   736  	if l == -1 {
   737  		return []byte{}
   738  	}
   739  	return b.Span(int(l))
   740  }
   741  
   742  // CompactBytes returns a Kafka compact byte array from the reader.
   743  //
   744  // This never returns nil.
   745  func (b *Reader) CompactBytes() []byte {
   746  	l := int(b.Uvarint()) - 1
   747  	if l == -1 { // same as above: -1 should not be allowed here
   748  		return []byte{}
   749  	}
   750  	return b.Span(l)
   751  }
   752  
   753  // NullableBytes returns a Kafka nullable byte array from the reader, returning
   754  // nil as appropriate.
   755  func (b *Reader) NullableBytes() []byte {
   756  	l := b.Int32()
   757  	if l < 0 {
   758  		return nil
   759  	}
   760  	r := b.Span(int(l))
   761  	return r
   762  }
   763  
   764  // CompactNullableBytes returns a Kafka compact nullable byte array from the
   765  // reader, returning nil as appropriate.
   766  func (b *Reader) CompactNullableBytes() []byte {
   767  	l := int(b.Uvarint()) - 1
   768  	if l < 0 {
   769  		return nil
   770  	}
   771  	r := b.Span(l)
   772  	return r
   773  }
   774  
   775  // ArrayLen returns a Kafka array length from the reader.
   776  func (b *Reader) ArrayLen() int32 {
   777  	r := b.Int32()
   778  	// The min size of a Kafka type is a byte, so if we do not have
   779  	// at least the array length of bytes left, it is bad.
   780  	if len(b.Src) < int(r) {
   781  		b.bad = true
   782  		b.Src = nil
   783  		return 0
   784  	}
   785  	return r
   786  }
   787  
   788  // VarintArrayLen returns a Kafka array length from the reader.
   789  func (b *Reader) VarintArrayLen() int32 {
   790  	r := b.Varint()
   791  	// The min size of a Kafka type is a byte, so if we do not have
   792  	// at least the array length of bytes left, it is bad.
   793  	if len(b.Src) < int(r) {
   794  		b.bad = true
   795  		b.Src = nil
   796  		return 0
   797  	}
   798  	return r
   799  }
   800  
   801  // CompactArrayLen returns a Kafka compact array length from the reader.
   802  func (b *Reader) CompactArrayLen() int32 {
   803  	r := int32(b.Uvarint()) - 1
   804  	// The min size of a Kafka type is a byte, so if we do not have
   805  	// at least the array length of bytes left, it is bad.
   806  	if len(b.Src) < int(r) {
   807  		b.bad = true
   808  		b.Src = nil
   809  		return 0
   810  	}
   811  	return r
   812  }
   813  
   814  // VarintBytes returns a Kafka encoded varint array from the reader, returning
   815  // nil as appropriate.
   816  func (b *Reader) VarintBytes() []byte {
   817  	l := b.Varint()
   818  	if l < 0 {
   819  		return nil
   820  	}
   821  	return b.Span(int(l))
   822  }
   823  
   824  // UnsafeVarintString returns a Kafka encoded varint string from the reader
   825  // without allocating using the unsafe package. This must be used with care;
   826  // note the string holds a reference to the original slice.
   827  func (b *Reader) UnsafeVarintString() string {
   828  	return UnsafeString(b.VarintBytes())
   829  }
   830  
   831  // VarintString returns a Kafka encoded varint string from the reader.
   832  func (b *Reader) VarintString() string {
   833  	return string(b.VarintBytes())
   834  }
   835  
   836  // Complete returns ErrNotEnoughData if the source ran out while decoding.
   837  func (b *Reader) Complete() error {
   838  	if b.bad {
   839  		return ErrNotEnoughData
   840  	}
   841  	return nil
   842  }
   843  
   844  // Ok returns true if the reader is still ok.
   845  func (b *Reader) Ok() bool {
   846  	return !b.bad
   847  }
   848  
   849  // UnsafeString returns the slice as a string using unsafe rule (6).
   850  func UnsafeString(slice []byte) string {
   851  	var str string
   852  	strhdr := (*reflect.StringHeader)(unsafe.Pointer(&str))             //nolint:gosec // known way to convert slice to string
   853  	strhdr.Data = ((*reflect.SliceHeader)(unsafe.Pointer(&slice))).Data //nolint:gosec // known way to convert slice to string
   854  	strhdr.Len = len(slice)
   855  	return str
   856  }
   857  

View as plain text