...

Source file src/github.com/apache/arrow/go/v15/parquet/internal/encoding/boolean_decoder.go

Documentation: github.com/apache/arrow/go/v15/parquet/internal/encoding

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package encoding
    18  
    19  import (
    20  	"bytes"
    21  	"encoding/binary"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  
    26  	"github.com/apache/arrow/go/v15/arrow/bitutil"
    27  	shared_utils "github.com/apache/arrow/go/v15/internal/utils"
    28  	"github.com/apache/arrow/go/v15/parquet"
    29  	"github.com/apache/arrow/go/v15/parquet/internal/utils"
    30  )
    31  
    32  // PlainBooleanDecoder is for the Plain Encoding type, there is no
    33  // dictionary decoding for bools.
    34  type PlainBooleanDecoder struct {
    35  	decoder
    36  
    37  	bitOffset int
    38  }
    39  
    40  // Type for the PlainBooleanDecoder is parquet.Types.Boolean
    41  func (PlainBooleanDecoder) Type() parquet.Type {
    42  	return parquet.Types.Boolean
    43  }
    44  
    45  func (dec *PlainBooleanDecoder) SetData(nvals int, data []byte) error {
    46  	if err := dec.decoder.SetData(nvals, data); err != nil {
    47  		return err
    48  	}
    49  	dec.bitOffset = 0
    50  	return nil
    51  }
    52  
    53  // Decode fills out with bools decoded from the data at the current point
    54  // or until we reach the end of the data.
    55  //
    56  // Returns the number of values decoded
    57  func (dec *PlainBooleanDecoder) Decode(out []bool) (int, error) {
    58  	max := shared_utils.Min(len(out), dec.nvals)
    59  
    60  	// attempts to read all remaining bool values from the current data byte
    61  	unalignedExtract := func(i int) int {
    62  		for ; dec.bitOffset < 8 && i < max; i, dec.bitOffset = i+1, dec.bitOffset+1 {
    63  			out[i] = (dec.data[0] & byte(1<<dec.bitOffset)) != 0
    64  		}
    65  		if dec.bitOffset == 8 {
    66  			// we read every bit from this byte
    67  			dec.bitOffset = 0
    68  			dec.data = dec.data[1:] // move data forward
    69  		}
    70  		return i // return the next index for out[]
    71  	}
    72  
    73  	// if we aren't at a byte boundary, then get bools until we hit
    74  	// a byte boundary with the bit offset.
    75  	i := 0
    76  	if dec.bitOffset != 0 {
    77  		i = unalignedExtract(i)
    78  	}
    79  
    80  	// determine the number of full bytes worth of bits we can decode
    81  	// given the number of values we want to decode.
    82  	bitsRemain := max - i
    83  	batch := (bitsRemain / 8) * 8
    84  	if batch > 0 { // only go in here if there's at least one full byte to decode
    85  		// determine the number of aligned bytes we can grab using SIMD optimized
    86  		// functions to improve performance.
    87  		alignedBytes := bitutil.BytesForBits(int64(batch))
    88  		utils.BytesToBools(dec.data[:alignedBytes], out[i:])
    89  
    90  		dec.data = dec.data[alignedBytes:] // move data forward
    91  		i += int(alignedBytes) * 8
    92  	}
    93  
    94  	// grab any trailing bits now that we've got our aligned bytes.
    95  	_ = unalignedExtract(i)
    96  
    97  	dec.nvals -= max
    98  	return max, nil
    99  }
   100  
   101  // DecodeSpaced is like Decode except it expands the values to leave spaces for null
   102  // as determined by the validBits bitmap.
   103  func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
   104  	if nullCount > 0 {
   105  		toRead := len(out) - nullCount
   106  		valuesRead, err := dec.Decode(out[:toRead])
   107  		if err != nil {
   108  			return 0, err
   109  		}
   110  		if valuesRead != toRead {
   111  			return valuesRead, errors.New("parquet: boolean decoder: number of values / definition levels read did not match")
   112  		}
   113  		return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
   114  	}
   115  	return dec.Decode(out)
   116  }
   117  
   118  type RleBooleanDecoder struct {
   119  	decoder
   120  
   121  	rleDec *utils.RleDecoder
   122  }
   123  
   124  func (RleBooleanDecoder) Type() parquet.Type {
   125  	return parquet.Types.Boolean
   126  }
   127  
   128  func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
   129  	dec.nvals = nvals
   130  
   131  	if len(data) < 4 {
   132  		return fmt.Errorf("invalid length - %d (corrupt data page?)", len(data))
   133  	}
   134  
   135  	// load the first 4 bytes in little-endian which indicates the length
   136  	nbytes := binary.LittleEndian.Uint32(data[:4])
   137  	if nbytes > uint32(len(data)-4) {
   138  		return fmt.Errorf("received invalid number of bytes - %d (corrupt data page?)", nbytes)
   139  	}
   140  
   141  	dec.data = data[4:]
   142  	if dec.rleDec == nil {
   143  		dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
   144  	} else {
   145  		dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
   146  	}
   147  	return nil
   148  }
   149  
   150  func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
   151  	max := shared_utils.Min(len(out), dec.nvals)
   152  
   153  	var (
   154  		buf [1024]uint64
   155  		n   = max
   156  	)
   157  
   158  	for n > 0 {
   159  		batch := shared_utils.Min(len(buf), n)
   160  		decoded := dec.rleDec.GetBatch(buf[:batch])
   161  		if decoded != batch {
   162  			return max - n, io.ErrUnexpectedEOF
   163  		}
   164  
   165  		for i := 0; i < batch; i++ {
   166  			out[i] = buf[i] != 0
   167  		}
   168  		n -= batch
   169  		out = out[batch:]
   170  	}
   171  
   172  	dec.nvals -= max
   173  	return max, nil
   174  }
   175  
   176  func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
   177  	if nullCount > 0 {
   178  		toRead := len(out) - nullCount
   179  		valuesRead, err := dec.Decode(out[:toRead])
   180  		if err != nil {
   181  			return 0, err
   182  		}
   183  		if valuesRead != toRead {
   184  			return valuesRead, errors.New("parquet: rle boolean decoder: number of values / definition levels read did not match")
   185  		}
   186  		return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
   187  	}
   188  	return dec.Decode(out)
   189  }
   190  

View as plain text