...

Source file src/github.com/apache/arrow/go/v15/parquet/internal/encoding/delta_bit_packing.go

Documentation: github.com/apache/arrow/go/v15/parquet/internal/encoding

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package encoding
    18  
    19  import (
    20  	"bytes"
    21  	"errors"
    22  	"math"
    23  	"math/bits"
    24  	"reflect"
    25  
    26  	"github.com/apache/arrow/go/v15/arrow"
    27  	"github.com/apache/arrow/go/v15/arrow/memory"
    28  	shared_utils "github.com/apache/arrow/go/v15/internal/utils"
    29  	"github.com/apache/arrow/go/v15/parquet"
    30  	"github.com/apache/arrow/go/v15/parquet/internal/utils"
    31  )
    32  
    33  // see the deltaBitPack encoder for a description of the encoding format that is
    34  // used for delta-bitpacking.
    35  type deltaBitPackDecoder struct {
    36  	decoder
    37  
    38  	mem memory.Allocator
    39  
    40  	usedFirst            bool
    41  	bitdecoder           *utils.BitReader
    42  	blockSize            uint64
    43  	currentBlockVals     uint32
    44  	miniBlocksPerBlock   uint64
    45  	valsPerMini          uint32
    46  	currentMiniBlockVals uint32
    47  	minDelta             int64
    48  	miniBlockIdx         uint64
    49  
    50  	deltaBitWidths *memory.Buffer
    51  	deltaBitWidth  byte
    52  
    53  	totalValues uint64
    54  	lastVal     int64
    55  }
    56  
    57  // returns the number of bytes read so far
    58  func (d *deltaBitPackDecoder) bytesRead() int64 {
    59  	return d.bitdecoder.CurOffset()
    60  }
    61  
    62  func (d *deltaBitPackDecoder) Allocator() memory.Allocator { return d.mem }
    63  
    64  // SetData sets the bytes and the expected number of values to decode
    65  // into the decoder, updating the decoder and allowing it to be reused.
    66  func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error {
    67  	// set our data into the underlying decoder for the type
    68  	if err := d.decoder.SetData(nvalues, data); err != nil {
    69  		return err
    70  	}
    71  	// create a bit reader for our decoder's values
    72  	d.bitdecoder = utils.NewBitReader(bytes.NewReader(d.data))
    73  	d.currentBlockVals = 0
    74  	d.currentMiniBlockVals = 0
    75  	if d.deltaBitWidths == nil {
    76  		d.deltaBitWidths = memory.NewResizableBuffer(d.mem)
    77  	}
    78  
    79  	var ok bool
    80  	d.blockSize, ok = d.bitdecoder.GetVlqInt()
    81  	if !ok {
    82  		return errors.New("parquet: eof exception")
    83  	}
    84  
    85  	if d.miniBlocksPerBlock, ok = d.bitdecoder.GetVlqInt(); !ok {
    86  		return errors.New("parquet: eof exception")
    87  	}
    88  	if d.miniBlocksPerBlock == 0 {
    89  		return errors.New("parquet: cannot have zero miniblock per block")
    90  	}
    91  
    92  	if d.totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
    93  		return errors.New("parquet: eof exception")
    94  	}
    95  
    96  	if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
    97  		return errors.New("parquet: eof exception")
    98  	}
    99  
   100  	d.valsPerMini = uint32(d.blockSize / d.miniBlocksPerBlock)
   101  	d.usedFirst = false
   102  	return nil
   103  }
   104  
   105  // initialize a block to decode
   106  func (d *deltaBitPackDecoder) initBlock() error {
   107  	// first we grab the min delta value that we'll start from
   108  	var ok bool
   109  	if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
   110  		return errors.New("parquet: eof exception")
   111  	}
   112  
   113  	// ensure we have enough space for our miniblocks to decode the widths
   114  	d.deltaBitWidths.Resize(int(d.miniBlocksPerBlock))
   115  
   116  	var err error
   117  	for i := uint64(0); i < d.miniBlocksPerBlock; i++ {
   118  		if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil {
   119  			return err
   120  		}
   121  	}
   122  
   123  	d.miniBlockIdx = 0
   124  	d.deltaBitWidth = d.deltaBitWidths.Bytes()[0]
   125  	d.currentBlockVals = uint32(d.blockSize)
   126  	return nil
   127  }
   128  
   129  // DeltaBitPackInt32Decoder decodes Int32 values which are packed using the Delta BitPacking algorithm.
   130  type DeltaBitPackInt32Decoder struct {
   131  	*deltaBitPackDecoder
   132  
   133  	miniBlockValues []int32
   134  }
   135  
   136  func (d *DeltaBitPackInt32Decoder) unpackNextMini() error {
   137  	if d.miniBlockValues == nil {
   138  		d.miniBlockValues = make([]int32, 0, int(d.valsPerMini))
   139  	} else {
   140  		d.miniBlockValues = d.miniBlockValues[:0]
   141  	}
   142  	d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
   143  	d.currentMiniBlockVals = d.valsPerMini
   144  
   145  	for j := 0; j < int(d.valsPerMini); j++ {
   146  		delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
   147  		if !ok {
   148  			return errors.New("parquet: eof exception")
   149  		}
   150  
   151  		d.lastVal += int64(delta) + int64(d.minDelta)
   152  		d.miniBlockValues = append(d.miniBlockValues, int32(d.lastVal))
   153  	}
   154  	d.miniBlockIdx++
   155  	return nil
   156  }
   157  
   158  // Decode retrieves min(remaining values, len(out)) values from the data and returns the number
   159  // of values actually decoded and any errors encountered.
   160  func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) {
   161  	max := shared_utils.Min(len(out), int(d.totalValues))
   162  	if max == 0 {
   163  		return 0, nil
   164  	}
   165  
   166  	out = out[:max]
   167  	if !d.usedFirst { // starting value to calculate deltas against
   168  		out[0] = int32(d.lastVal)
   169  		out = out[1:]
   170  		d.usedFirst = true
   171  	}
   172  
   173  	var err error
   174  	for len(out) > 0 { // unpack mini blocks until we get all the values we need
   175  		if d.currentBlockVals == 0 {
   176  			err = d.initBlock()
   177  			if err != nil {
   178  				return 0, err
   179  			}
   180  		}
   181  		if d.currentMiniBlockVals == 0 {
   182  			err = d.unpackNextMini()
   183  		}
   184  		if err != nil {
   185  			return 0, err
   186  		}
   187  
   188  		// copy as many values from our mini block as we can into out
   189  		start := int(d.valsPerMini - d.currentMiniBlockVals)
   190  		numCopied := copy(out, d.miniBlockValues[start:])
   191  
   192  		out = out[numCopied:]
   193  		d.currentBlockVals -= uint32(numCopied)
   194  		d.currentMiniBlockVals -= uint32(numCopied)
   195  	}
   196  	d.nvals -= max
   197  	return max, nil
   198  }
   199  
   200  // DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
   201  func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
   202  	toread := len(out) - nullCount
   203  	values, err := d.Decode(out[:toread])
   204  	if err != nil {
   205  		return values, err
   206  	}
   207  	if values != toread {
   208  		return values, errors.New("parquet: number of values / definition levels read did not match")
   209  	}
   210  
   211  	return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
   212  }
   213  
   214  // Type returns the physical parquet type that this decoder decodes, in this case Int32
   215  func (DeltaBitPackInt32Decoder) Type() parquet.Type {
   216  	return parquet.Types.Int32
   217  }
   218  
   219  // DeltaBitPackInt64Decoder decodes a delta bit packed int64 column of data.
   220  type DeltaBitPackInt64Decoder struct {
   221  	*deltaBitPackDecoder
   222  
   223  	miniBlockValues []int64
   224  }
   225  
   226  func (d *DeltaBitPackInt64Decoder) unpackNextMini() error {
   227  	if d.miniBlockValues == nil {
   228  		d.miniBlockValues = make([]int64, 0, int(d.valsPerMini))
   229  	} else {
   230  		d.miniBlockValues = d.miniBlockValues[:0]
   231  	}
   232  
   233  	d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
   234  	d.currentMiniBlockVals = d.valsPerMini
   235  
   236  	for j := 0; j < int(d.valsPerMini); j++ {
   237  		delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
   238  		if !ok {
   239  			return errors.New("parquet: eof exception")
   240  		}
   241  
   242  		d.lastVal += int64(delta) + d.minDelta
   243  		d.miniBlockValues = append(d.miniBlockValues, d.lastVal)
   244  	}
   245  	d.miniBlockIdx++
   246  	return nil
   247  }
   248  
   249  // Decode retrieves min(remaining values, len(out)) values from the data and returns the number
   250  // of values actually decoded and any errors encountered.
   251  func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) {
   252  	max := shared_utils.Min(len(out), d.nvals)
   253  	if max == 0 {
   254  		return 0, nil
   255  	}
   256  
   257  	out = out[:max]
   258  	if !d.usedFirst {
   259  		out[0] = d.lastVal
   260  		out = out[1:]
   261  		d.usedFirst = true
   262  	}
   263  
   264  	var err error
   265  	for len(out) > 0 {
   266  		if d.currentBlockVals == 0 {
   267  			err = d.initBlock()
   268  			if err != nil {
   269  				return 0, err
   270  			}
   271  		}
   272  		if d.currentMiniBlockVals == 0 {
   273  			err = d.unpackNextMini()
   274  		}
   275  
   276  		if err != nil {
   277  			return 0, err
   278  		}
   279  
   280  		start := int(d.valsPerMini - d.currentMiniBlockVals)
   281  		numCopied := copy(out, d.miniBlockValues[start:])
   282  
   283  		out = out[numCopied:]
   284  		d.currentBlockVals -= uint32(numCopied)
   285  		d.currentMiniBlockVals -= uint32(numCopied)
   286  	}
   287  	d.nvals -= max
   288  	return max, nil
   289  }
   290  
   291  // Type returns the physical parquet type that this decoder decodes, in this case Int64
   292  func (DeltaBitPackInt64Decoder) Type() parquet.Type {
   293  	return parquet.Types.Int64
   294  }
   295  
   296  // DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
   297  func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
   298  	toread := len(out) - nullCount
   299  	values, err := d.Decode(out[:toread])
   300  	if err != nil {
   301  		return values, err
   302  	}
   303  	if values != toread {
   304  		return values, errors.New("parquet: number of values / definition levels read did not match")
   305  	}
   306  
   307  	return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
   308  }
   309  
   310  const (
   311  	// block size must be a multiple of 128
   312  	defaultBlockSize     = 128
   313  	defaultNumMiniBlocks = 4
   314  	// block size / number of mini blocks must result in a multiple of 32
   315  	defaultNumValuesPerMini = 32
   316  	// max size of the header for the delta blocks
   317  	maxHeaderWriterSize = 32
   318  )
   319  
   320  // deltaBitPackEncoder is an encoder for the DeltaBinary Packing format
   321  // as per the parquet spec.
   322  //
   323  // Consists of a header followed by blocks of delta encoded values binary packed.
   324  //
   325  //	Format
   326  //		[header] [block 1] [block 2] ... [block N]
   327  //
   328  //	Header
   329  //		[block size] [number of mini blocks per block] [total value count] [first value]
   330  //
   331  //	Block
   332  //		[min delta] [list of bitwidths of the miniblocks] [miniblocks...]
   333  //
   334  // Sets aside bytes at the start of the internal buffer where the header will be written,
   335  // and only writes the header when FlushValues is called before returning it.
   336  type deltaBitPackEncoder struct {
   337  	encoder
   338  
   339  	bitWriter  *utils.BitWriter
   340  	totalVals  uint64
   341  	firstVal   int64
   342  	currentVal int64
   343  
   344  	blockSize     uint64
   345  	miniBlockSize uint64
   346  	numMiniBlocks uint64
   347  	deltas        []int64
   348  }
   349  
   350  // flushBlock flushes out a finished block for writing to the underlying encoder
   351  func (enc *deltaBitPackEncoder) flushBlock() {
   352  	if len(enc.deltas) == 0 {
   353  		return
   354  	}
   355  
   356  	// determine the minimum delta value
   357  	minDelta := int64(math.MaxInt64)
   358  	for _, delta := range enc.deltas {
   359  		if delta < minDelta {
   360  			minDelta = delta
   361  		}
   362  	}
   363  
   364  	enc.bitWriter.WriteZigZagVlqInt(minDelta)
   365  	// reserve enough bytes to write out our miniblock deltas
   366  	offset, _ := enc.bitWriter.SkipBytes(int(enc.numMiniBlocks))
   367  
   368  	valuesToWrite := int64(len(enc.deltas))
   369  	for i := 0; i < int(enc.numMiniBlocks); i++ {
   370  		n := shared_utils.Min(int64(enc.miniBlockSize), valuesToWrite)
   371  		if n == 0 {
   372  			break
   373  		}
   374  
   375  		maxDelta := int64(math.MinInt64)
   376  		start := i * int(enc.miniBlockSize)
   377  		for _, val := range enc.deltas[start : start+int(n)] {
   378  			maxDelta = shared_utils.Max(maxDelta, val)
   379  		}
   380  
   381  		// compute bit width to store (max_delta - min_delta)
   382  		width := uint(bits.Len64(uint64(maxDelta - minDelta)))
   383  		// write out the bit width we used into the bytes we reserved earlier
   384  		enc.bitWriter.WriteAt([]byte{byte(width)}, int64(offset+i))
   385  
   386  		// write out our deltas
   387  		for _, val := range enc.deltas[start : start+int(n)] {
   388  			enc.bitWriter.WriteValue(uint64(val-minDelta), width)
   389  		}
   390  
   391  		valuesToWrite -= n
   392  
   393  		// pad the last block if n < miniBlockSize
   394  		for ; n < int64(enc.miniBlockSize); n++ {
   395  			enc.bitWriter.WriteValue(0, width)
   396  		}
   397  	}
   398  	enc.deltas = enc.deltas[:0]
   399  }
   400  
   401  // putInternal is the implementation for actually writing data which must be
   402  // integral data as int, int8, int32, or int64.
   403  func (enc *deltaBitPackEncoder) putInternal(data interface{}) {
   404  	v := reflect.ValueOf(data)
   405  	if v.Len() == 0 {
   406  		return
   407  	}
   408  
   409  	idx := 0
   410  	if enc.totalVals == 0 {
   411  		enc.blockSize = defaultBlockSize
   412  		enc.numMiniBlocks = defaultNumMiniBlocks
   413  		enc.miniBlockSize = defaultNumValuesPerMini
   414  
   415  		enc.firstVal = v.Index(0).Int()
   416  		enc.currentVal = enc.firstVal
   417  		idx = 1
   418  
   419  		enc.bitWriter = utils.NewBitWriter(enc.sink)
   420  	}
   421  
   422  	enc.totalVals += uint64(v.Len())
   423  	for ; idx < v.Len(); idx++ {
   424  		val := v.Index(idx).Int()
   425  		enc.deltas = append(enc.deltas, val-enc.currentVal)
   426  		enc.currentVal = val
   427  		if len(enc.deltas) == int(enc.blockSize) {
   428  			enc.flushBlock()
   429  		}
   430  	}
   431  }
   432  
   433  // FlushValues flushes any remaining data and returns the finished encoded buffer
   434  // or returns nil and any error encountered during flushing.
   435  func (enc *deltaBitPackEncoder) FlushValues() (Buffer, error) {
   436  	if enc.bitWriter != nil {
   437  		// write any remaining values
   438  		enc.flushBlock()
   439  		enc.bitWriter.Flush(true)
   440  	} else {
   441  		enc.blockSize = defaultBlockSize
   442  		enc.numMiniBlocks = defaultNumMiniBlocks
   443  		enc.miniBlockSize = defaultNumValuesPerMini
   444  	}
   445  
   446  	buffer := make([]byte, maxHeaderWriterSize)
   447  	headerWriter := utils.NewBitWriter(utils.NewWriterAtBuffer(buffer))
   448  
   449  	headerWriter.WriteVlqInt(uint64(enc.blockSize))
   450  	headerWriter.WriteVlqInt(uint64(enc.numMiniBlocks))
   451  	headerWriter.WriteVlqInt(uint64(enc.totalVals))
   452  	headerWriter.WriteZigZagVlqInt(int64(enc.firstVal))
   453  	headerWriter.Flush(false)
   454  
   455  	buffer = buffer[:headerWriter.Written()]
   456  	enc.totalVals = 0
   457  
   458  	if enc.bitWriter != nil {
   459  		flushed := enc.sink.Finish()
   460  		defer flushed.Release()
   461  
   462  		buffer = append(buffer, flushed.Buf()[:enc.bitWriter.Written()]...)
   463  	}
   464  	return poolBuffer{memory.NewBufferBytes(buffer)}, nil
   465  }
   466  
   467  // EstimatedDataEncodedSize returns the current amount of data actually flushed out and written
   468  func (enc *deltaBitPackEncoder) EstimatedDataEncodedSize() int64 {
   469  	return int64(enc.bitWriter.Written())
   470  }
   471  
   472  // DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
   473  type DeltaBitPackInt32Encoder struct {
   474  	*deltaBitPackEncoder
   475  }
   476  
   477  // Put writes the values from the provided slice of int32 to the encoder
   478  func (enc DeltaBitPackInt32Encoder) Put(in []int32) {
   479  	enc.putInternal(in)
   480  }
   481  
   482  // PutSpaced takes a slice of int32 along with a bitmap that describes the nulls and an offset into the bitmap
   483  // in order to write spaced data to the encoder.
   484  func (enc DeltaBitPackInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) {
   485  	buffer := memory.NewResizableBuffer(enc.mem)
   486  	buffer.Reserve(arrow.Int32Traits.BytesRequired(len(in)))
   487  	defer buffer.Release()
   488  
   489  	data := arrow.Int32Traits.CastFromBytes(buffer.Buf())
   490  	nvalid := spacedCompress(in, data, validBits, validBitsOffset)
   491  	enc.Put(data[:nvalid])
   492  }
   493  
   494  // Type returns the underlying physical type this encoder works with, in this case Int32
   495  func (DeltaBitPackInt32Encoder) Type() parquet.Type {
   496  	return parquet.Types.Int32
   497  }
   498  
   499  // DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
   500  type DeltaBitPackInt64Encoder struct {
   501  	*deltaBitPackEncoder
   502  }
   503  
   504  // Put writes the values from the provided slice of int64 to the encoder
   505  func (enc DeltaBitPackInt64Encoder) Put(in []int64) {
   506  	enc.putInternal(in)
   507  }
   508  
   509  // PutSpaced takes a slice of int64 along with a bitmap that describes the nulls and an offset into the bitmap
   510  // in order to write spaced data to the encoder.
   511  func (enc DeltaBitPackInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) {
   512  	buffer := memory.NewResizableBuffer(enc.mem)
   513  	buffer.Reserve(arrow.Int64Traits.BytesRequired(len(in)))
   514  	defer buffer.Release()
   515  
   516  	data := arrow.Int64Traits.CastFromBytes(buffer.Buf())
   517  	nvalid := spacedCompress(in, data, validBits, validBitsOffset)
   518  	enc.Put(data[:nvalid])
   519  }
   520  
   521  // Type returns the underlying physical type this encoder works with, in this case Int64
   522  func (DeltaBitPackInt64Encoder) Type() parquet.Type {
   523  	return parquet.Types.Int64
   524  }
   525  

View as plain text