...

Source file src/github.com/syndtr/goleveldb/leveldb/table/reader.go

Documentation: github.com/syndtr/goleveldb/leveldb/table

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package table
     8  
     9  import (
    10  	"encoding/binary"
    11  	"fmt"
    12  	"io"
    13  	"sort"
    14  	"strings"
    15  	"sync"
    16  
    17  	"github.com/golang/snappy"
    18  
    19  	"github.com/syndtr/goleveldb/leveldb/cache"
    20  	"github.com/syndtr/goleveldb/leveldb/comparer"
    21  	"github.com/syndtr/goleveldb/leveldb/errors"
    22  	"github.com/syndtr/goleveldb/leveldb/filter"
    23  	"github.com/syndtr/goleveldb/leveldb/iterator"
    24  	"github.com/syndtr/goleveldb/leveldb/opt"
    25  	"github.com/syndtr/goleveldb/leveldb/storage"
    26  	"github.com/syndtr/goleveldb/leveldb/util"
    27  )
    28  
    29  // Reader errors.
    30  var (
    31  	ErrNotFound       = errors.ErrNotFound
    32  	ErrReaderReleased = errors.New("leveldb/table: reader released")
    33  	ErrIterReleased   = errors.New("leveldb/table: iterator released")
    34  )
    35  
    36  // ErrCorrupted describes error due to corruption. This error will be wrapped
    37  // with errors.ErrCorrupted.
    38  type ErrCorrupted struct {
    39  	Pos    int64
    40  	Size   int64
    41  	Kind   string
    42  	Reason string
    43  }
    44  
    45  func (e *ErrCorrupted) Error() string {
    46  	return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
    47  }
    48  
    49  func max(x, y int) int {
    50  	if x > y {
    51  		return x
    52  	}
    53  	return y
    54  }
    55  
    56  type block struct {
    57  	bpool          *util.BufferPool
    58  	bh             blockHandle
    59  	data           []byte
    60  	restartsLen    int
    61  	restartsOffset int
    62  }
    63  
    64  func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
    65  	index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
    66  		offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
    67  		offset++                                    // shared always zero, since this is a restart point
    68  		v1, n1 := binary.Uvarint(b.data[offset:])   // key length
    69  		_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
    70  		m := offset + n1 + n2
    71  		return cmp.Compare(b.data[m:m+int(v1)], key) > 0
    72  	}) + rstart - 1
    73  	if index < rstart {
    74  		// The smallest key is greater-than key sought.
    75  		index = rstart
    76  	}
    77  	offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
    78  	return
    79  }
    80  
    81  func (b *block) restartIndex(rstart, rlimit, offset int) int {
    82  	return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
    83  		return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
    84  	}) + rstart - 1
    85  }
    86  
    87  func (b *block) restartOffset(index int) int {
    88  	return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
    89  }
    90  
    91  func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
    92  	if offset >= b.restartsOffset {
    93  		if offset != b.restartsOffset {
    94  			err = &ErrCorrupted{Reason: "entries offset not aligned"}
    95  		}
    96  		return
    97  	}
    98  	v0, n0 := binary.Uvarint(b.data[offset:])       // Shared prefix length
    99  	v1, n1 := binary.Uvarint(b.data[offset+n0:])    // Key length
   100  	v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length
   101  	m := n0 + n1 + n2
   102  	n = m + int(v1) + int(v2)
   103  	if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
   104  		err = &ErrCorrupted{Reason: "entries corrupted"}
   105  		return
   106  	}
   107  	key = b.data[offset+m : offset+m+int(v1)]
   108  	value = b.data[offset+m+int(v1) : offset+n]
   109  	nShared = int(v0)
   110  	return
   111  }
   112  
   113  func (b *block) Release() {
   114  	b.bpool.Put(b.data)
   115  	b.bpool = nil
   116  	b.data = nil
   117  }
   118  
   119  type dir int
   120  
   121  const (
   122  	dirReleased dir = iota - 1
   123  	dirSOI
   124  	dirEOI
   125  	dirBackward
   126  	dirForward
   127  )
   128  
   129  type blockIter struct {
   130  	tr            *Reader
   131  	block         *block
   132  	blockReleaser util.Releaser
   133  	releaser      util.Releaser
   134  	key, value    []byte
   135  	offset        int
   136  	// Previous offset, only filled by Next.
   137  	prevOffset   int
   138  	prevNode     []int
   139  	prevKeys     []byte
   140  	restartIndex int
   141  	// Iterator direction.
   142  	dir dir
   143  	// Restart index slice range.
   144  	riStart int
   145  	riLimit int
   146  	// Offset slice range.
   147  	offsetStart     int
   148  	offsetRealStart int
   149  	offsetLimit     int
   150  	// Error.
   151  	err error
   152  }
   153  
   154  func (i *blockIter) sErr(err error) {
   155  	i.err = err
   156  	i.key = nil
   157  	i.value = nil
   158  	i.prevNode = nil
   159  	i.prevKeys = nil
   160  }
   161  
   162  func (i *blockIter) reset() {
   163  	if i.dir == dirBackward {
   164  		i.prevNode = i.prevNode[:0]
   165  		i.prevKeys = i.prevKeys[:0]
   166  	}
   167  	i.restartIndex = i.riStart
   168  	i.offset = i.offsetStart
   169  	i.dir = dirSOI
   170  	i.key = i.key[:0]
   171  	i.value = nil
   172  }
   173  
   174  func (i *blockIter) isFirst() bool {
   175  	switch i.dir {
   176  	case dirForward:
   177  		return i.prevOffset == i.offsetRealStart
   178  	case dirBackward:
   179  		return len(i.prevNode) == 1 && i.restartIndex == i.riStart
   180  	}
   181  	return false
   182  }
   183  
   184  func (i *blockIter) isLast() bool {
   185  	switch i.dir {
   186  	case dirForward, dirBackward:
   187  		return i.offset == i.offsetLimit
   188  	}
   189  	return false
   190  }
   191  
   192  func (i *blockIter) First() bool {
   193  	if i.err != nil {
   194  		return false
   195  	} else if i.dir == dirReleased {
   196  		i.err = ErrIterReleased
   197  		return false
   198  	}
   199  
   200  	if i.dir == dirBackward {
   201  		i.prevNode = i.prevNode[:0]
   202  		i.prevKeys = i.prevKeys[:0]
   203  	}
   204  	i.dir = dirSOI
   205  	return i.Next()
   206  }
   207  
   208  func (i *blockIter) Last() bool {
   209  	if i.err != nil {
   210  		return false
   211  	} else if i.dir == dirReleased {
   212  		i.err = ErrIterReleased
   213  		return false
   214  	}
   215  
   216  	if i.dir == dirBackward {
   217  		i.prevNode = i.prevNode[:0]
   218  		i.prevKeys = i.prevKeys[:0]
   219  	}
   220  	i.dir = dirEOI
   221  	return i.Prev()
   222  }
   223  
   224  func (i *blockIter) Seek(key []byte) bool {
   225  	if i.err != nil {
   226  		return false
   227  	} else if i.dir == dirReleased {
   228  		i.err = ErrIterReleased
   229  		return false
   230  	}
   231  
   232  	ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
   233  	if err != nil {
   234  		i.sErr(err)
   235  		return false
   236  	}
   237  	i.restartIndex = ri
   238  	i.offset = max(i.offsetStart, offset)
   239  	if i.dir == dirSOI || i.dir == dirEOI {
   240  		i.dir = dirForward
   241  	}
   242  	for i.Next() {
   243  		if i.tr.cmp.Compare(i.key, key) >= 0 {
   244  			return true
   245  		}
   246  	}
   247  	return false
   248  }
   249  
   250  func (i *blockIter) Next() bool {
   251  	if i.dir == dirEOI || i.err != nil {
   252  		return false
   253  	} else if i.dir == dirReleased {
   254  		i.err = ErrIterReleased
   255  		return false
   256  	}
   257  
   258  	if i.dir == dirSOI {
   259  		i.restartIndex = i.riStart
   260  		i.offset = i.offsetStart
   261  	} else if i.dir == dirBackward {
   262  		i.prevNode = i.prevNode[:0]
   263  		i.prevKeys = i.prevKeys[:0]
   264  	}
   265  	for i.offset < i.offsetRealStart {
   266  		key, value, nShared, n, err := i.block.entry(i.offset)
   267  		if err != nil {
   268  			i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
   269  			return false
   270  		}
   271  		if n == 0 {
   272  			i.dir = dirEOI
   273  			return false
   274  		}
   275  		i.key = append(i.key[:nShared], key...)
   276  		i.value = value
   277  		i.offset += n
   278  	}
   279  	if i.offset >= i.offsetLimit {
   280  		i.dir = dirEOI
   281  		if i.offset != i.offsetLimit {
   282  			i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
   283  		}
   284  		return false
   285  	}
   286  	key, value, nShared, n, err := i.block.entry(i.offset)
   287  	if err != nil {
   288  		i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
   289  		return false
   290  	}
   291  	if n == 0 {
   292  		i.dir = dirEOI
   293  		return false
   294  	}
   295  	i.key = append(i.key[:nShared], key...)
   296  	i.value = value
   297  	i.prevOffset = i.offset
   298  	i.offset += n
   299  	i.dir = dirForward
   300  	return true
   301  }
   302  
   303  func (i *blockIter) Prev() bool {
   304  	if i.dir == dirSOI || i.err != nil {
   305  		return false
   306  	} else if i.dir == dirReleased {
   307  		i.err = ErrIterReleased
   308  		return false
   309  	}
   310  
   311  	var ri int
   312  	if i.dir == dirForward {
   313  		// Change direction.
   314  		i.offset = i.prevOffset
   315  		if i.offset == i.offsetRealStart {
   316  			i.dir = dirSOI
   317  			return false
   318  		}
   319  		ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset)
   320  		i.dir = dirBackward
   321  	} else if i.dir == dirEOI {
   322  		// At the end of iterator.
   323  		i.restartIndex = i.riLimit
   324  		i.offset = i.offsetLimit
   325  		if i.offset == i.offsetRealStart {
   326  			i.dir = dirSOI
   327  			return false
   328  		}
   329  		ri = i.riLimit - 1
   330  		i.dir = dirBackward
   331  	} else if len(i.prevNode) == 1 {
   332  		// This is the end of a restart range.
   333  		i.offset = i.prevNode[0]
   334  		i.prevNode = i.prevNode[:0]
   335  		if i.restartIndex == i.riStart {
   336  			i.dir = dirSOI
   337  			return false
   338  		}
   339  		i.restartIndex--
   340  		ri = i.restartIndex
   341  	} else {
   342  		// In the middle of restart range, get from cache.
   343  		n := len(i.prevNode) - 3
   344  		node := i.prevNode[n:]
   345  		i.prevNode = i.prevNode[:n]
   346  		// Get the key.
   347  		ko := node[0]
   348  		i.key = append(i.key[:0], i.prevKeys[ko:]...)
   349  		i.prevKeys = i.prevKeys[:ko]
   350  		// Get the value.
   351  		vo := node[1]
   352  		vl := vo + node[2]
   353  		i.value = i.block.data[vo:vl]
   354  		i.offset = vl
   355  		return true
   356  	}
   357  	// Build entries cache.
   358  	i.key = i.key[:0]
   359  	i.value = nil
   360  	offset := i.block.restartOffset(ri)
   361  	if offset == i.offset {
   362  		ri--
   363  		if ri < 0 {
   364  			i.dir = dirSOI
   365  			return false
   366  		}
   367  		offset = i.block.restartOffset(ri)
   368  	}
   369  	i.prevNode = append(i.prevNode, offset)
   370  	for {
   371  		key, value, nShared, n, err := i.block.entry(offset)
   372  		if err != nil {
   373  			i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
   374  			return false
   375  		}
   376  		if offset >= i.offsetRealStart {
   377  			if i.value != nil {
   378  				// Appends 3 variables:
   379  				// 1. Previous keys offset
   380  				// 2. Value offset in the data block
   381  				// 3. Value length
   382  				i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value))
   383  				i.prevKeys = append(i.prevKeys, i.key...)
   384  			}
   385  			i.value = value
   386  		}
   387  		i.key = append(i.key[:nShared], key...)
   388  		offset += n
   389  		// Stop if target offset reached.
   390  		if offset >= i.offset {
   391  			if offset != i.offset {
   392  				i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
   393  				return false
   394  			}
   395  
   396  			break
   397  		}
   398  	}
   399  	i.restartIndex = ri
   400  	i.offset = offset
   401  	return true
   402  }
   403  
   404  func (i *blockIter) Key() []byte {
   405  	if i.err != nil || i.dir <= dirEOI {
   406  		return nil
   407  	}
   408  	return i.key
   409  }
   410  
   411  func (i *blockIter) Value() []byte {
   412  	if i.err != nil || i.dir <= dirEOI {
   413  		return nil
   414  	}
   415  	return i.value
   416  }
   417  
   418  func (i *blockIter) Release() {
   419  	if i.dir != dirReleased {
   420  		i.tr = nil
   421  		i.block = nil
   422  		i.prevNode = nil
   423  		i.prevKeys = nil
   424  		i.key = nil
   425  		i.value = nil
   426  		i.dir = dirReleased
   427  		if i.blockReleaser != nil {
   428  			i.blockReleaser.Release()
   429  			i.blockReleaser = nil
   430  		}
   431  		if i.releaser != nil {
   432  			i.releaser.Release()
   433  			i.releaser = nil
   434  		}
   435  	}
   436  }
   437  
   438  func (i *blockIter) SetReleaser(releaser util.Releaser) {
   439  	if i.dir == dirReleased {
   440  		panic(util.ErrReleased)
   441  	}
   442  	if i.releaser != nil && releaser != nil {
   443  		panic(util.ErrHasReleaser)
   444  	}
   445  	i.releaser = releaser
   446  }
   447  
   448  func (i *blockIter) Valid() bool {
   449  	return i.err == nil && (i.dir == dirBackward || i.dir == dirForward)
   450  }
   451  
   452  func (i *blockIter) Error() error {
   453  	return i.err
   454  }
   455  
   456  type filterBlock struct {
   457  	bpool      *util.BufferPool
   458  	data       []byte
   459  	oOffset    int
   460  	baseLg     uint
   461  	filtersNum int
   462  }
   463  
   464  func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
   465  	i := int(offset >> b.baseLg)
   466  	if i < b.filtersNum {
   467  		o := b.data[b.oOffset+i*4:]
   468  		n := int(binary.LittleEndian.Uint32(o))
   469  		m := int(binary.LittleEndian.Uint32(o[4:]))
   470  		if n < m && m <= b.oOffset {
   471  			return filter.Contains(b.data[n:m], key)
   472  		} else if n == m {
   473  			return false
   474  		}
   475  	}
   476  	return true
   477  }
   478  
   479  func (b *filterBlock) Release() {
   480  	b.bpool.Put(b.data)
   481  	b.bpool = nil
   482  	b.data = nil
   483  }
   484  
   485  type indexIter struct {
   486  	*blockIter
   487  	tr    *Reader
   488  	slice *util.Range
   489  	// Options
   490  	fillCache bool
   491  }
   492  
   493  func (i *indexIter) Get() iterator.Iterator {
   494  	value := i.Value()
   495  	if value == nil {
   496  		return nil
   497  	}
   498  	dataBH, n := decodeBlockHandle(value)
   499  	if n == 0 {
   500  		return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
   501  	}
   502  
   503  	var slice *util.Range
   504  	if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
   505  		slice = i.slice
   506  	}
   507  	return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
   508  }
   509  
   510  // Reader is a table reader.
   511  type Reader struct {
   512  	mu     sync.RWMutex
   513  	fd     storage.FileDesc
   514  	reader io.ReaderAt
   515  	cache  *cache.NamespaceGetter
   516  	err    error
   517  	bpool  *util.BufferPool
   518  	// Options
   519  	o              *opt.Options
   520  	cmp            comparer.Comparer
   521  	filter         filter.Filter
   522  	verifyChecksum bool
   523  
   524  	dataEnd                   int64
   525  	metaBH, indexBH, filterBH blockHandle
   526  	indexBlock                *block
   527  	filterBlock               *filterBlock
   528  }
   529  
   530  func (r *Reader) blockKind(bh blockHandle) string {
   531  	switch bh.offset {
   532  	case r.metaBH.offset:
   533  		return "meta-block"
   534  	case r.indexBH.offset:
   535  		return "index-block"
   536  	case r.filterBH.offset:
   537  		if r.filterBH.length > 0 {
   538  			return "filter-block"
   539  		}
   540  	}
   541  	return "data-block"
   542  }
   543  
   544  func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
   545  	return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
   546  }
   547  
   548  func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
   549  	return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
   550  }
   551  
   552  func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
   553  	if cerr, ok := err.(*ErrCorrupted); ok {
   554  		cerr.Pos = int64(bh.offset)
   555  		cerr.Size = int64(bh.length)
   556  		cerr.Kind = r.blockKind(bh)
   557  		return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
   558  	}
   559  	return err
   560  }
   561  
   562  func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
   563  	data := r.bpool.Get(int(bh.length + blockTrailerLen))
   564  	if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
   565  		return nil, err
   566  	}
   567  
   568  	if verifyChecksum {
   569  		n := bh.length + 1
   570  		checksum0 := binary.LittleEndian.Uint32(data[n:])
   571  		checksum1 := util.NewCRC(data[:n]).Value()
   572  		if checksum0 != checksum1 {
   573  			r.bpool.Put(data)
   574  			return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
   575  		}
   576  	}
   577  
   578  	switch data[bh.length] {
   579  	case blockTypeNoCompression:
   580  		data = data[:bh.length]
   581  	case blockTypeSnappyCompression:
   582  		decLen, err := snappy.DecodedLen(data[:bh.length])
   583  		if err != nil {
   584  			r.bpool.Put(data)
   585  			return nil, r.newErrCorruptedBH(bh, err.Error())
   586  		}
   587  		decData := r.bpool.Get(decLen)
   588  		decData, err = snappy.Decode(decData, data[:bh.length])
   589  		r.bpool.Put(data)
   590  		if err != nil {
   591  			r.bpool.Put(decData)
   592  			return nil, r.newErrCorruptedBH(bh, err.Error())
   593  		}
   594  		data = decData
   595  	default:
   596  		r.bpool.Put(data)
   597  		return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
   598  	}
   599  	return data, nil
   600  }
   601  
   602  func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
   603  	data, err := r.readRawBlock(bh, verifyChecksum)
   604  	if err != nil {
   605  		return nil, err
   606  	}
   607  	restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
   608  	b := &block{
   609  		bpool:          r.bpool,
   610  		bh:             bh,
   611  		data:           data,
   612  		restartsLen:    restartsLen,
   613  		restartsOffset: len(data) - (restartsLen+1)*4,
   614  	}
   615  	return b, nil
   616  }
   617  
   618  func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
   619  	if r.cache != nil {
   620  		var (
   621  			err error
   622  			ch  *cache.Handle
   623  		)
   624  		if fillCache {
   625  			ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
   626  				var b *block
   627  				b, err = r.readBlock(bh, verifyChecksum)
   628  				if err != nil {
   629  					return 0, nil
   630  				}
   631  				return cap(b.data), b
   632  			})
   633  		} else {
   634  			ch = r.cache.Get(bh.offset, nil)
   635  		}
   636  		if ch != nil {
   637  			b, ok := ch.Value().(*block)
   638  			if !ok {
   639  				ch.Release()
   640  				return nil, nil, errors.New("leveldb/table: inconsistent block type")
   641  			}
   642  			return b, ch, err
   643  		} else if err != nil {
   644  			return nil, nil, err
   645  		}
   646  	}
   647  
   648  	b, err := r.readBlock(bh, verifyChecksum)
   649  	return b, b, err
   650  }
   651  
   652  func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
   653  	data, err := r.readRawBlock(bh, true)
   654  	if err != nil {
   655  		return nil, err
   656  	}
   657  	n := len(data)
   658  	if n < 5 {
   659  		return nil, r.newErrCorruptedBH(bh, "too short")
   660  	}
   661  	m := n - 5
   662  	oOffset := int(binary.LittleEndian.Uint32(data[m:]))
   663  	if oOffset > m {
   664  		return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
   665  	}
   666  	b := &filterBlock{
   667  		bpool:      r.bpool,
   668  		data:       data,
   669  		oOffset:    oOffset,
   670  		baseLg:     uint(data[n-1]),
   671  		filtersNum: (m - oOffset) / 4,
   672  	}
   673  	return b, nil
   674  }
   675  
   676  func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
   677  	if r.cache != nil {
   678  		var (
   679  			err error
   680  			ch  *cache.Handle
   681  		)
   682  		if fillCache {
   683  			ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
   684  				var b *filterBlock
   685  				b, err = r.readFilterBlock(bh)
   686  				if err != nil {
   687  					return 0, nil
   688  				}
   689  				return cap(b.data), b
   690  			})
   691  		} else {
   692  			ch = r.cache.Get(bh.offset, nil)
   693  		}
   694  		if ch != nil {
   695  			b, ok := ch.Value().(*filterBlock)
   696  			if !ok {
   697  				ch.Release()
   698  				return nil, nil, errors.New("leveldb/table: inconsistent block type")
   699  			}
   700  			return b, ch, err
   701  		} else if err != nil {
   702  			return nil, nil, err
   703  		}
   704  	}
   705  
   706  	b, err := r.readFilterBlock(bh)
   707  	return b, b, err
   708  }
   709  
   710  func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
   711  	if r.indexBlock == nil {
   712  		return r.readBlockCached(r.indexBH, true, fillCache)
   713  	}
   714  	return r.indexBlock, util.NoopReleaser{}, nil
   715  }
   716  
   717  func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
   718  	if r.filterBlock == nil {
   719  		return r.readFilterBlockCached(r.filterBH, fillCache)
   720  	}
   721  	return r.filterBlock, util.NoopReleaser{}, nil
   722  }
   723  
   724  func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
   725  	bi := &blockIter{
   726  		tr:            r,
   727  		block:         b,
   728  		blockReleaser: bReleaser,
   729  		// Valid key should never be nil.
   730  		key:             make([]byte, 0),
   731  		dir:             dirSOI,
   732  		riStart:         0,
   733  		riLimit:         b.restartsLen,
   734  		offsetStart:     0,
   735  		offsetRealStart: 0,
   736  		offsetLimit:     b.restartsOffset,
   737  	}
   738  	if slice != nil {
   739  		if slice.Start != nil {
   740  			if bi.Seek(slice.Start) {
   741  				bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
   742  				bi.offsetStart = b.restartOffset(bi.riStart)
   743  				bi.offsetRealStart = bi.prevOffset
   744  			} else {
   745  				bi.riStart = b.restartsLen
   746  				bi.offsetStart = b.restartsOffset
   747  				bi.offsetRealStart = b.restartsOffset
   748  			}
   749  		}
   750  		if slice.Limit != nil {
   751  			if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
   752  				bi.offsetLimit = bi.prevOffset
   753  				bi.riLimit = bi.restartIndex + 1
   754  			}
   755  		}
   756  		bi.reset()
   757  		if bi.offsetStart > bi.offsetLimit {
   758  			bi.sErr(errors.New("leveldb/table: invalid slice range"))
   759  		}
   760  	}
   761  	return bi
   762  }
   763  
   764  func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
   765  	b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
   766  	if err != nil {
   767  		return iterator.NewEmptyIterator(err)
   768  	}
   769  	return r.newBlockIter(b, rel, slice, false)
   770  }
   771  
   772  func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
   773  	r.mu.RLock()
   774  	defer r.mu.RUnlock()
   775  
   776  	if r.err != nil {
   777  		return iterator.NewEmptyIterator(r.err)
   778  	}
   779  
   780  	return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
   781  }
   782  
   783  // NewIterator creates an iterator from the table.
   784  //
   785  // Slice allows slicing the iterator to only contains keys in the given
   786  // range. A nil Range.Start is treated as a key before all keys in the
   787  // table. And a nil Range.Limit is treated as a key after all keys in
   788  // the table.
   789  //
   790  // WARNING: Any slice returned by interator (e.g. slice returned by calling
   791  // Iterator.Key() or Iterator.Key() methods), its content should not be modified
   792  // unless noted otherwise.
   793  //
   794  // The returned iterator is not safe for concurrent use and should be released
   795  // after use.
   796  //
   797  // Also read Iterator documentation of the leveldb/iterator package.
   798  func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
   799  	r.mu.RLock()
   800  	defer r.mu.RUnlock()
   801  
   802  	if r.err != nil {
   803  		return iterator.NewEmptyIterator(r.err)
   804  	}
   805  
   806  	fillCache := !ro.GetDontFillCache()
   807  	indexBlock, rel, err := r.getIndexBlock(fillCache)
   808  	if err != nil {
   809  		return iterator.NewEmptyIterator(err)
   810  	}
   811  	index := &indexIter{
   812  		blockIter: r.newBlockIter(indexBlock, rel, slice, true),
   813  		tr:        r,
   814  		slice:     slice,
   815  		fillCache: !ro.GetDontFillCache(),
   816  	}
   817  	return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
   818  }
   819  
   820  func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
   821  	r.mu.RLock()
   822  	defer r.mu.RUnlock()
   823  
   824  	if r.err != nil {
   825  		err = r.err
   826  		return
   827  	}
   828  
   829  	indexBlock, rel, err := r.getIndexBlock(true)
   830  	if err != nil {
   831  		return
   832  	}
   833  	defer rel.Release()
   834  
   835  	index := r.newBlockIter(indexBlock, nil, nil, true)
   836  	defer index.Release()
   837  
   838  	if !index.Seek(key) {
   839  		if err = index.Error(); err == nil {
   840  			err = ErrNotFound
   841  		}
   842  		return
   843  	}
   844  
   845  	dataBH, n := decodeBlockHandle(index.Value())
   846  	if n == 0 {
   847  		r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
   848  		return nil, nil, r.err
   849  	}
   850  
   851  	// The filter should only used for exact match.
   852  	if filtered && r.filter != nil {
   853  		filterBlock, frel, ferr := r.getFilterBlock(true)
   854  		if ferr == nil {
   855  			if !filterBlock.contains(r.filter, dataBH.offset, key) {
   856  				frel.Release()
   857  				return nil, nil, ErrNotFound
   858  			}
   859  			frel.Release()
   860  		} else if !errors.IsCorrupted(ferr) {
   861  			return nil, nil, ferr
   862  		}
   863  	}
   864  
   865  	data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
   866  	if !data.Seek(key) {
   867  		data.Release()
   868  		if err = data.Error(); err != nil {
   869  			return
   870  		}
   871  
   872  		// The nearest greater-than key is the first key of the next block.
   873  		if !index.Next() {
   874  			if err = index.Error(); err == nil {
   875  				err = ErrNotFound
   876  			}
   877  			return
   878  		}
   879  
   880  		dataBH, n = decodeBlockHandle(index.Value())
   881  		if n == 0 {
   882  			r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
   883  			return nil, nil, r.err
   884  		}
   885  
   886  		data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
   887  		if !data.Next() {
   888  			data.Release()
   889  			if err = data.Error(); err == nil {
   890  				err = ErrNotFound
   891  			}
   892  			return
   893  		}
   894  	}
   895  
   896  	// Key doesn't use block buffer, no need to copy the buffer.
   897  	rkey = data.Key()
   898  	if !noValue {
   899  		if r.bpool == nil {
   900  			value = data.Value()
   901  		} else {
   902  			// Value does use block buffer, and since the buffer will be
   903  			// recycled, it need to be copied.
   904  			value = append([]byte(nil), data.Value()...)
   905  		}
   906  	}
   907  	data.Release()
   908  	return
   909  }
   910  
   911  // Find finds key/value pair whose key is greater than or equal to the
   912  // given key. It returns ErrNotFound if the table doesn't contain
   913  // such pair.
   914  // If filtered is true then the nearest 'block' will be checked against
   915  // 'filter data' (if present) and will immediately return ErrNotFound if
   916  // 'filter data' indicates that such pair doesn't exist.
   917  //
   918  // The caller may modify the contents of the returned slice as it is its
   919  // own copy.
   920  // It is safe to modify the contents of the argument after Find returns.
   921  func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
   922  	return r.find(key, filtered, ro, false)
   923  }
   924  
   925  // FindKey finds key that is greater than or equal to the given key.
   926  // It returns ErrNotFound if the table doesn't contain such key.
   927  // If filtered is true then the nearest 'block' will be checked against
   928  // 'filter data' (if present) and will immediately return ErrNotFound if
   929  // 'filter data' indicates that such key doesn't exist.
   930  //
   931  // The caller may modify the contents of the returned slice as it is its
   932  // own copy.
   933  // It is safe to modify the contents of the argument after Find returns.
   934  func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
   935  	rkey, _, err = r.find(key, filtered, ro, true)
   936  	return
   937  }
   938  
   939  // Get gets the value for the given key. It returns errors.ErrNotFound
   940  // if the table does not contain the key.
   941  //
   942  // The caller may modify the contents of the returned slice as it is its
   943  // own copy.
   944  // It is safe to modify the contents of the argument after Find returns.
   945  func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
   946  	r.mu.RLock()
   947  	defer r.mu.RUnlock()
   948  
   949  	if r.err != nil {
   950  		err = r.err
   951  		return
   952  	}
   953  
   954  	rkey, value, err := r.find(key, false, ro, false)
   955  	if err == nil && r.cmp.Compare(rkey, key) != 0 {
   956  		value = nil
   957  		err = ErrNotFound
   958  	}
   959  	return
   960  }
   961  
   962  // OffsetOf returns approximate offset for the given key.
   963  //
   964  // It is safe to modify the contents of the argument after Get returns.
   965  func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
   966  	r.mu.RLock()
   967  	defer r.mu.RUnlock()
   968  
   969  	if r.err != nil {
   970  		err = r.err
   971  		return
   972  	}
   973  
   974  	indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
   975  	if err != nil {
   976  		return
   977  	}
   978  	defer rel.Release()
   979  
   980  	index := r.newBlockIter(indexBlock, nil, nil, true)
   981  	defer index.Release()
   982  	if index.Seek(key) {
   983  		dataBH, n := decodeBlockHandle(index.Value())
   984  		if n == 0 {
   985  			r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
   986  			return
   987  		}
   988  		offset = int64(dataBH.offset)
   989  		return
   990  	}
   991  	err = index.Error()
   992  	if err == nil {
   993  		offset = r.dataEnd
   994  	}
   995  	return
   996  }
   997  
   998  // Release implements util.Releaser.
   999  // It also close the file if it is an io.Closer.
  1000  func (r *Reader) Release() {
  1001  	r.mu.Lock()
  1002  	defer r.mu.Unlock()
  1003  
  1004  	if closer, ok := r.reader.(io.Closer); ok {
  1005  		closer.Close()
  1006  	}
  1007  	if r.indexBlock != nil {
  1008  		r.indexBlock.Release()
  1009  		r.indexBlock = nil
  1010  	}
  1011  	if r.filterBlock != nil {
  1012  		r.filterBlock.Release()
  1013  		r.filterBlock = nil
  1014  	}
  1015  	r.reader = nil
  1016  	r.cache = nil
  1017  	r.bpool = nil
  1018  	r.err = ErrReaderReleased
  1019  }
  1020  
  1021  // NewReader creates a new initialized table reader for the file.
  1022  // The fi, cache and bpool is optional and can be nil.
  1023  //
  1024  // The returned table reader instance is safe for concurrent use.
  1025  func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
  1026  	if f == nil {
  1027  		return nil, errors.New("leveldb/table: nil file")
  1028  	}
  1029  
  1030  	r := &Reader{
  1031  		fd:             fd,
  1032  		reader:         f,
  1033  		cache:          cache,
  1034  		bpool:          bpool,
  1035  		o:              o,
  1036  		cmp:            o.GetComparer(),
  1037  		verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
  1038  	}
  1039  
  1040  	if size < footerLen {
  1041  		r.err = r.newErrCorrupted(0, size, "table", "too small")
  1042  		return r, nil
  1043  	}
  1044  
  1045  	footerPos := size - footerLen
  1046  	var footer [footerLen]byte
  1047  	if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
  1048  		return nil, err
  1049  	}
  1050  	if string(footer[footerLen-len(magic):footerLen]) != magic {
  1051  		r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
  1052  		return r, nil
  1053  	}
  1054  
  1055  	var n int
  1056  	// Decode the metaindex block handle.
  1057  	r.metaBH, n = decodeBlockHandle(footer[:])
  1058  	if n == 0 {
  1059  		r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
  1060  		return r, nil
  1061  	}
  1062  
  1063  	// Decode the index block handle.
  1064  	r.indexBH, n = decodeBlockHandle(footer[n:])
  1065  	if n == 0 {
  1066  		r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
  1067  		return r, nil
  1068  	}
  1069  
  1070  	// Read metaindex block.
  1071  	metaBlock, err := r.readBlock(r.metaBH, true)
  1072  	if err != nil {
  1073  		if errors.IsCorrupted(err) {
  1074  			r.err = err
  1075  			return r, nil
  1076  		}
  1077  		return nil, err
  1078  	}
  1079  
  1080  	// Set data end.
  1081  	r.dataEnd = int64(r.metaBH.offset)
  1082  
  1083  	// Read metaindex.
  1084  	metaIter := r.newBlockIter(metaBlock, nil, nil, true)
  1085  	for metaIter.Next() {
  1086  		key := string(metaIter.Key())
  1087  		if !strings.HasPrefix(key, "filter.") {
  1088  			continue
  1089  		}
  1090  		fn := key[7:]
  1091  		if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
  1092  			r.filter = f0
  1093  		} else {
  1094  			for _, f0 := range o.GetAltFilters() {
  1095  				if f0.Name() == fn {
  1096  					r.filter = f0
  1097  					break
  1098  				}
  1099  			}
  1100  		}
  1101  		if r.filter != nil {
  1102  			filterBH, n := decodeBlockHandle(metaIter.Value())
  1103  			if n == 0 {
  1104  				continue
  1105  			}
  1106  			r.filterBH = filterBH
  1107  			// Update data end.
  1108  			r.dataEnd = int64(filterBH.offset)
  1109  			break
  1110  		}
  1111  	}
  1112  	metaIter.Release()
  1113  	metaBlock.Release()
  1114  
  1115  	// Cache index and filter block locally, since we don't have global cache.
  1116  	if cache == nil {
  1117  		r.indexBlock, err = r.readBlock(r.indexBH, true)
  1118  		if err != nil {
  1119  			if errors.IsCorrupted(err) {
  1120  				r.err = err
  1121  				return r, nil
  1122  			}
  1123  			return nil, err
  1124  		}
  1125  		if r.filter != nil {
  1126  			r.filterBlock, err = r.readFilterBlock(r.filterBH)
  1127  			if err != nil {
  1128  				if !errors.IsCorrupted(err) {
  1129  					return nil, err
  1130  				}
  1131  
  1132  				// Don't use filter then.
  1133  				r.filter = nil
  1134  			}
  1135  		}
  1136  	}
  1137  
  1138  	return r, nil
  1139  }
  1140  

View as plain text