...

Source file src/github.com/apache/arrow/go/v15/parquet/file/column_reader.go

Documentation: github.com/apache/arrow/go/v15/parquet/file

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package file
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"sync"
    23  
    24  	"github.com/apache/arrow/go/v15/arrow/memory"
    25  	"github.com/apache/arrow/go/v15/internal/utils"
    26  	"github.com/apache/arrow/go/v15/parquet"
    27  	"github.com/apache/arrow/go/v15/parquet/internal/encoding"
    28  	"github.com/apache/arrow/go/v15/parquet/internal/encryption"
    29  	format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
    30  	"github.com/apache/arrow/go/v15/parquet/schema"
    31  	"golang.org/x/xerrors"
    32  )
    33  
    34  const (
    35  	// 4 MB is the default maximum page header size
    36  	defaultMaxPageHeaderSize = 4 * 1024 * 1024
    37  	// 16 KB is the default expected page header size
    38  	defaultPageHeaderSize = 16 * 1024
    39  )
    40  
    41  //go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=../internal/encoding/physical_types.tmpldata column_reader_types.gen.go.tmpl
    42  
    43  func isDictIndexEncoding(e format.Encoding) bool {
    44  	return e == format.Encoding_RLE_DICTIONARY || e == format.Encoding_PLAIN_DICTIONARY
    45  }
    46  
    47  // CryptoContext is a context for keeping track of the current methods for decrypting.
    48  // It keeps track of the row group and column numbers along with references to the
    49  // decryptor objects.
    50  type CryptoContext struct {
    51  	StartDecryptWithDictionaryPage bool
    52  	RowGroupOrdinal                int16
    53  	ColumnOrdinal                  int16
    54  	MetaDecryptor                  encryption.Decryptor
    55  	DataDecryptor                  encryption.Decryptor
    56  }
    57  
    58  // ColumnChunkReader is the basic interface for all column readers. It will use
    59  // a page reader to read all the pages in a column chunk from a row group.
    60  //
    61  // To actually Read out the column data, you need to convert to the properly
    62  // typed ColumnChunkReader type such as *BooleanColumnReader etc.
    63  //
    64  // Some things to clarify when working with column readers:
    65  //
    66  // "Values" refers to the physical data values in a data page.
    67  //
    68  // This is separate from the number of "rows" in a column and the total number
    69  // of "elements" in a column because null values aren't stored physically in the
    70  // data page but are represented via definition levels, so the number of values
    71  // in a column can be less than the number of rows.
    72  //
    73  // The total number of "elements" in a column also differs because of potential
    74  // repeated fields, where you can have multiple values in the page which
    75  // together make up a single element (such as a list) or depending on the repetition
    76  // level and definition level, could represent an entire null list or just a null
    77  // element inside of a list.
    78  type ColumnChunkReader interface {
    79  	// HasNext returns whether there is more data to be read in this column
    80  	// and row group.
    81  	HasNext() bool
    82  	// Type returns the underlying physical type of the column
    83  	Type() parquet.Type
    84  	// Descriptor returns the column schema container
    85  	Descriptor() *schema.Column
    86  	// if HasNext returns false because of an error, this will return the error
    87  	// it encountered. Otherwise this will be nil if it's just the end of the
    88  	// column
    89  	Err() error
    90  	// Skip buffered values
    91  	consumeBufferedValues(int64)
    92  	// number of available buffered values that have not been decoded yet
    93  	// when this returns 0, you're at the end of a page.
    94  	numAvailValues() int64
    95  	// read the definition levels and return the number of definitions,
    96  	// and the number of values to be read (number of def levels == maxdef level)
    97  	// it also populates the passed in slice which should be sized appropriately.
    98  	readDefinitionLevels(levels []int16) (int, int64)
    99  	// read the repetition levels and return the number of repetition levels read
   100  	// also populates the passed in slice, which should be sized appropriately.
   101  	readRepetitionLevels(levels []int16) int
   102  	// a column is made up of potentially multiple pages across potentially multiple
   103  	// row groups. A PageReader allows looping through the pages in a single row group.
   104  	// When moving to another row group for reading, use setPageReader to re-use the
   105  	// column reader for reading the pages of the new row group.
   106  	pager() PageReader
   107  	// set a page reader into the columnreader so it can be reused.
   108  	//
   109  	// This will clear any current error in the reader but does not
   110  	// automatically read the first page of the page reader passed in until
   111  	// HasNext which will read in the next page.
   112  	setPageReader(PageReader)
   113  }
   114  
   115  type columnChunkReader struct {
   116  	descr             *schema.Column
   117  	rdr               PageReader
   118  	repetitionDecoder encoding.LevelDecoder
   119  	definitionDecoder encoding.LevelDecoder
   120  
   121  	curPage     Page
   122  	curEncoding format.Encoding
   123  	curDecoder  encoding.TypedDecoder
   124  
   125  	// number of currently buffered values in the current page
   126  	numBuffered int64
   127  	// the number of values we've decoded so far
   128  	numDecoded int64
   129  	mem        memory.Allocator
   130  	bufferPool *sync.Pool
   131  
   132  	decoders      map[format.Encoding]encoding.TypedDecoder
   133  	decoderTraits encoding.DecoderTraits
   134  
   135  	// is set when an error is encountered
   136  	err          error
   137  	defLvlBuffer []int16
   138  
   139  	newDictionary bool
   140  }
   141  
   142  // NewColumnReader returns a column reader for the provided column initialized with the given pagereader that will
   143  // provide the pages of data for this column. The type is determined from the column passed in.
   144  //
   145  // In addition to the page reader and allocator, a pointer to a shared sync.Pool is expected to provide buffers for temporary
   146  // usage to minimize allocations. The bufferPool should provide *memory.Buffer objects that can be resized as necessary, buffers
   147  // should have `ResizeNoShrink(0)` called on them before being put back into the pool.
   148  func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
   149  	base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: bufferPool}
   150  	switch descr.PhysicalType() {
   151  	case parquet.Types.FixedLenByteArray:
   152  		base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
   153  		return &FixedLenByteArrayColumnChunkReader{base}
   154  	case parquet.Types.Float:
   155  		base.decoderTraits = &encoding.Float32DecoderTraits
   156  		return &Float32ColumnChunkReader{base}
   157  	case parquet.Types.Double:
   158  		base.decoderTraits = &encoding.Float64DecoderTraits
   159  		return &Float64ColumnChunkReader{base}
   160  	case parquet.Types.ByteArray:
   161  		base.decoderTraits = &encoding.ByteArrayDecoderTraits
   162  		return &ByteArrayColumnChunkReader{base}
   163  	case parquet.Types.Int32:
   164  		base.decoderTraits = &encoding.Int32DecoderTraits
   165  		return &Int32ColumnChunkReader{base}
   166  	case parquet.Types.Int64:
   167  		base.decoderTraits = &encoding.Int64DecoderTraits
   168  		return &Int64ColumnChunkReader{base}
   169  	case parquet.Types.Int96:
   170  		base.decoderTraits = &encoding.Int96DecoderTraits
   171  		return &Int96ColumnChunkReader{base}
   172  	case parquet.Types.Boolean:
   173  		base.decoderTraits = &encoding.BooleanDecoderTraits
   174  		return &BooleanColumnChunkReader{base}
   175  	}
   176  	return nil
   177  }
   178  
   179  func (c *columnChunkReader) Err() error                    { return c.err }
   180  func (c *columnChunkReader) Type() parquet.Type            { return c.descr.PhysicalType() }
   181  func (c *columnChunkReader) Descriptor() *schema.Column    { return c.descr }
   182  func (c *columnChunkReader) consumeBufferedValues(n int64) { c.numDecoded += n }
   183  func (c *columnChunkReader) numAvailValues() int64         { return c.numBuffered - c.numDecoded }
   184  func (c *columnChunkReader) pager() PageReader             { return c.rdr }
   185  func (c *columnChunkReader) setPageReader(rdr PageReader) {
   186  	c.rdr, c.err = rdr, nil
   187  	c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
   188  	c.numBuffered, c.numDecoded = 0, 0
   189  }
   190  
   191  func (c *columnChunkReader) getDefLvlBuffer(sz int64) []int16 {
   192  	if int64(len(c.defLvlBuffer)) < sz {
   193  		c.defLvlBuffer = make([]int16, sz)
   194  		return c.defLvlBuffer
   195  	}
   196  
   197  	return c.defLvlBuffer[:sz]
   198  }
   199  
   200  // HasNext returns whether there is more data to be read in this column
   201  // and row group.
   202  func (c *columnChunkReader) HasNext() bool {
   203  	if c.numBuffered == 0 || c.numDecoded == c.numBuffered {
   204  		return c.readNewPage() && c.numBuffered != 0
   205  	}
   206  	return true
   207  }
   208  
   209  func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
   210  	enc := page.encoding
   211  	if enc == format.Encoding_PLAIN_DICTIONARY || enc == format.Encoding_PLAIN {
   212  		enc = format.Encoding_RLE_DICTIONARY
   213  	}
   214  
   215  	if _, ok := c.decoders[enc]; ok {
   216  		return xerrors.New("parquet: column chunk cannot have more than one dictionary.")
   217  	}
   218  
   219  	switch page.Encoding() {
   220  	case format.Encoding_PLAIN, format.Encoding_PLAIN_DICTIONARY:
   221  		dict := c.decoderTraits.Decoder(parquet.Encodings.Plain, c.descr, false, c.mem)
   222  		dict.SetData(int(page.NumValues()), page.Data())
   223  
   224  		decoder := c.decoderTraits.Decoder(parquet.Encodings.Plain, c.descr, true, c.mem).(encoding.DictDecoder)
   225  		decoder.SetDict(dict)
   226  		c.decoders[enc] = decoder
   227  	default:
   228  		return xerrors.New("parquet: dictionary index must be plain encoding")
   229  	}
   230  
   231  	c.newDictionary = true
   232  	c.curDecoder = c.decoders[enc]
   233  	return nil
   234  }
   235  
   236  // read a new page from the page reader
   237  func (c *columnChunkReader) readNewPage() bool {
   238  	for c.rdr.Next() { // keep going until we get a data page
   239  		c.curPage = c.rdr.Page()
   240  		if c.curPage == nil {
   241  			break
   242  		}
   243  
   244  		var lvlByteLen int64
   245  		switch p := c.curPage.(type) {
   246  		case *DictionaryPage:
   247  			if err := c.configureDict(p); err != nil {
   248  				c.err = err
   249  				return false
   250  			}
   251  			continue
   252  		case *DataPageV1:
   253  			lvlByteLen, c.err = c.initLevelDecodersV1(p, p.repLvlEncoding, p.defLvlEncoding)
   254  			if c.err != nil {
   255  				return false
   256  			}
   257  		case *DataPageV2:
   258  			lvlByteLen, c.err = c.initLevelDecodersV2(p)
   259  			if c.err != nil {
   260  				return false
   261  			}
   262  		default:
   263  			// we can skip non-data pages
   264  			continue
   265  		}
   266  
   267  		c.err = c.initDataDecoder(c.curPage, lvlByteLen)
   268  		return c.err == nil
   269  	}
   270  	c.err = c.rdr.Err()
   271  	return false
   272  }
   273  
   274  func (c *columnChunkReader) initLevelDecodersV2(page *DataPageV2) (int64, error) {
   275  	c.numBuffered = int64(page.nvals)
   276  	c.numDecoded = 0
   277  	buf := page.Data()
   278  	totalLvlLen := int64(page.repLvlByteLen) + int64(page.defLvlByteLen)
   279  
   280  	if totalLvlLen > int64(len(buf)) {
   281  		return totalLvlLen, xerrors.New("parquet: data page too small for levels (corrupt header?)")
   282  	}
   283  
   284  	if c.descr.MaxRepetitionLevel() > 0 {
   285  		c.repetitionDecoder.SetDataV2(page.repLvlByteLen, c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
   286  	}
   287  	// ARROW-17453: Some writers will write repetition levels even when
   288  	// the max repetition level is 0, so we should respect the value
   289  	// in the page header regardless of whether MaxRepetitionLevel is 0
   290  	// or not.
   291  	buf = buf[page.repLvlByteLen:]
   292  
   293  	if c.descr.MaxDefinitionLevel() > 0 {
   294  		c.definitionDecoder.SetDataV2(page.defLvlByteLen, c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
   295  	}
   296  
   297  	return totalLvlLen, nil
   298  }
   299  
   300  func (c *columnChunkReader) initLevelDecodersV1(page *DataPageV1, repLvlEncoding, defLvlEncoding format.Encoding) (int64, error) {
   301  	c.numBuffered = int64(page.nvals)
   302  	c.numDecoded = 0
   303  
   304  	buf := page.Data()
   305  	maxSize := len(buf)
   306  	levelsByteLen := int64(0)
   307  
   308  	// Data page layout: Repetition Levels - Definition Levels - encoded values.
   309  	// Levels are encoded as rle or bit-packed
   310  	if c.descr.MaxRepetitionLevel() > 0 {
   311  		repBytes, err := c.repetitionDecoder.SetData(parquet.Encoding(repLvlEncoding), c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
   312  		if err != nil {
   313  			return levelsByteLen, err
   314  		}
   315  		buf = buf[repBytes:]
   316  		maxSize -= repBytes
   317  		levelsByteLen += int64(repBytes)
   318  	}
   319  
   320  	if c.descr.MaxDefinitionLevel() > 0 {
   321  		defBytes, err := c.definitionDecoder.SetData(parquet.Encoding(defLvlEncoding), c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
   322  		if err != nil {
   323  			return levelsByteLen, err
   324  		}
   325  		levelsByteLen += int64(defBytes)
   326  		maxSize -= defBytes
   327  	}
   328  
   329  	return levelsByteLen, nil
   330  }
   331  
   332  func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
   333  	buf := page.Data()
   334  	if int64(len(buf)) < lvlByteLen {
   335  		return xerrors.New("parquet: page smaller than size of encoded levels")
   336  	}
   337  
   338  	buf = buf[lvlByteLen:]
   339  	encoding := page.Encoding()
   340  
   341  	if isDictIndexEncoding(encoding) {
   342  		encoding = format.Encoding_RLE_DICTIONARY
   343  	}
   344  
   345  	if decoder, ok := c.decoders[encoding]; ok {
   346  		c.curDecoder = decoder
   347  	} else {
   348  		switch encoding {
   349  		case format.Encoding_RLE:
   350  			if c.descr.PhysicalType() != parquet.Types.Boolean {
   351  				return fmt.Errorf("parquet: only boolean supports RLE encoding, got %s", c.descr.PhysicalType())
   352  			}
   353  			fallthrough
   354  		case format.Encoding_PLAIN,
   355  			format.Encoding_DELTA_BYTE_ARRAY,
   356  			format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
   357  			format.Encoding_DELTA_BINARY_PACKED:
   358  			c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
   359  			c.decoders[encoding] = c.curDecoder
   360  		case format.Encoding_RLE_DICTIONARY:
   361  			return errors.New("parquet: dictionary page must be before data page")
   362  		case format.Encoding_BYTE_STREAM_SPLIT:
   363  			return fmt.Errorf("parquet: unsupported data encoding %s", encoding)
   364  		default:
   365  			return fmt.Errorf("parquet: unknown encoding type %s", encoding)
   366  		}
   367  	}
   368  
   369  	c.curEncoding = encoding
   370  	c.curDecoder.SetData(int(c.numBuffered), buf)
   371  	return nil
   372  }
   373  
   374  // readDefinitionLevels decodes the definition levels from the page and returns
   375  // it returns the total number of levels that were decoded (and thus populated
   376  // in the passed in slice) and the number of physical values that exist to read
   377  // (the number of levels that are equal to the max definition level).
   378  //
   379  // If the max definition level is 0, the assumption is that there no nulls in the
   380  // column and therefore no definition levels to read, so it will always return 0, 0
   381  func (c *columnChunkReader) readDefinitionLevels(levels []int16) (totalDecoded int, valuesToRead int64) {
   382  	if c.descr.MaxDefinitionLevel() == 0 {
   383  		return 0, 0
   384  	}
   385  
   386  	return c.definitionDecoder.Decode(levels)
   387  }
   388  
   389  // readRepetitionLevels decodes the repetition levels from the page and returns
   390  // the total number of values decoded (and thus populated in the passed in levels
   391  // slice).
   392  //
   393  // If max repetition level is 0, it is assumed there are no repetition levels,
   394  // and thus will always return 0.
   395  func (c *columnChunkReader) readRepetitionLevels(levels []int16) int {
   396  	if c.descr.MaxRepetitionLevel() == 0 {
   397  		return 0
   398  	}
   399  
   400  	nlevels, _ := c.repetitionDecoder.Decode(levels)
   401  	return nlevels
   402  }
   403  
   404  // determineNumToRead reads the definition levels (and optionally populates the repetition levels)
   405  // in order to determine how many values need to be read to fulfill this batch read.
   406  //
   407  // batchLen is the number of values it is desired to read. defLvls must be either nil (in which case
   408  // a buffer will be used) or must be at least batchLen in length to be safe. repLvls should be either nil
   409  // (in which case it is ignored) or should be at least batchLen in length to be safe.
   410  //
   411  // In the return values: ndef is the number of definition levels that were actually read in which will
   412  // typically be the minimum of batchLen and numAvailValues.
   413  // toRead is the number of physical values that should be read in based on the definition levels (the number
   414  // of definition levels that were equal to maxDefinitionLevel). and err being either nil or any error encountered
   415  func (c *columnChunkReader) determineNumToRead(batchLen int64, defLvls, repLvls []int16) (ndefs int, toRead int64, err error) {
   416  	if !c.HasNext() {
   417  		return 0, 0, c.err
   418  	}
   419  
   420  	size := utils.Min(batchLen, c.numBuffered-c.numDecoded)
   421  
   422  	if c.descr.MaxDefinitionLevel() > 0 {
   423  		if defLvls == nil {
   424  			defLvls = c.getDefLvlBuffer(size)
   425  		}
   426  		ndefs, toRead = c.readDefinitionLevels(defLvls[:size])
   427  	} else {
   428  		toRead = size
   429  	}
   430  
   431  	if c.descr.MaxRepetitionLevel() > 0 && repLvls != nil {
   432  		nreps := c.readRepetitionLevels(repLvls[:size])
   433  		if defLvls != nil && ndefs != nreps {
   434  			err = xerrors.New("parquet: number of decoded rep/def levels did not match")
   435  		}
   436  	}
   437  	return
   438  }
   439  
   440  // skipValues some number of rows using readFn as the function to read the data and throw it away.
   441  // If we can skipValues a whole page based on its metadata, then we do so, otherwise we read the
   442  // page until we have skipped the number of rows desired.
   443  func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, buf []byte) (int64, error)) (int64, error) {
   444  	var err error
   445  	toskip := nvalues
   446  	for c.HasNext() && toskip > 0 {
   447  		// if number to skip is more than the number of undecoded values, skip the page
   448  		if toskip > (c.numBuffered - c.numDecoded) {
   449  			toskip -= c.numBuffered - c.numDecoded
   450  			c.numDecoded = c.numBuffered
   451  		} else {
   452  			var (
   453  				batchSize int64 = 1024
   454  				valsRead  int64 = 0
   455  			)
   456  
   457  			scratch := c.bufferPool.Get().(*memory.Buffer)
   458  			defer func() {
   459  				scratch.ResizeNoShrink(0)
   460  				c.bufferPool.Put(scratch)
   461  			}()
   462  			bufMult := 1
   463  			if c.descr.PhysicalType() == parquet.Types.Boolean {
   464  				// for bools, BytesRequired returns 1 byte per 8 bool, but casting []byte to []bool requires 1 byte per 1 bool
   465  				bufMult = 8
   466  			}
   467  			scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
   468  
   469  			for {
   470  				batchSize = utils.Min(batchSize, toskip)
   471  				valsRead, err = readFn(batchSize, scratch.Buf())
   472  				toskip -= valsRead
   473  				if valsRead <= 0 || toskip <= 0 || err != nil {
   474  					break
   475  				}
   476  			}
   477  		}
   478  	}
   479  	if c.err != nil {
   480  		err = c.err
   481  	}
   482  	return nvalues - toskip, err
   483  }
   484  
   485  type readerFunc func(int64, int64) (int, error)
   486  
   487  // base function for reading a batch of values, this will read until it either reads in batchSize values or
   488  // it hits the end of the column chunk, including reading multiple pages.
   489  //
   490  // totalValues is the total number of values which were read in, and thus would be the total number
   491  // of definition levels and repetition levels which were populated (if they were non-nil). totalRead
   492  // is the number of physical values that were read in (ie: the number of non-null values)
   493  func (c *columnChunkReader) readBatch(batchSize int64, defLvls, repLvls []int16, readFn readerFunc) (totalLvls int64, totalRead int, err error) {
   494  	var (
   495  		read   int
   496  		defs   []int16
   497  		reps   []int16
   498  		ndefs  int
   499  		toRead int64
   500  	)
   501  
   502  	for c.HasNext() && totalLvls < batchSize && err == nil {
   503  		if defLvls != nil {
   504  			defs = defLvls[totalLvls:]
   505  		}
   506  		if repLvls != nil {
   507  			reps = repLvls[totalLvls:]
   508  		}
   509  		ndefs, toRead, err = c.determineNumToRead(batchSize-totalLvls, defs, reps)
   510  		if err != nil {
   511  			return totalLvls, totalRead, err
   512  		}
   513  
   514  		read, err = readFn(int64(totalRead), toRead)
   515  		// the total number of values processed here is the maximum of
   516  		// the number of definition levels or the number of physical values read.
   517  		// if this is a required field, ndefs will be 0 since there is no definition
   518  		// levels stored with it and `read` will be the number of values, otherwise
   519  		// we use ndefs since it will be equal to or greater than read.
   520  		totalVals := int64(utils.Max(ndefs, read))
   521  		c.consumeBufferedValues(totalVals)
   522  
   523  		totalLvls += totalVals
   524  		totalRead += read
   525  	}
   526  	return totalLvls, totalRead, err
   527  }
   528  

View as plain text