1
2
3
4
5
6
7 package table
8
9 import (
10 "encoding/binary"
11 "fmt"
12 "io"
13 "sort"
14 "strings"
15 "sync"
16
17 "github.com/golang/snappy"
18
19 "github.com/syndtr/goleveldb/leveldb/cache"
20 "github.com/syndtr/goleveldb/leveldb/comparer"
21 "github.com/syndtr/goleveldb/leveldb/errors"
22 "github.com/syndtr/goleveldb/leveldb/filter"
23 "github.com/syndtr/goleveldb/leveldb/iterator"
24 "github.com/syndtr/goleveldb/leveldb/opt"
25 "github.com/syndtr/goleveldb/leveldb/storage"
26 "github.com/syndtr/goleveldb/leveldb/util"
27 )
28
29
30 var (
31 ErrNotFound = errors.ErrNotFound
32 ErrReaderReleased = errors.New("leveldb/table: reader released")
33 ErrIterReleased = errors.New("leveldb/table: iterator released")
34 )
35
36
37
38 type ErrCorrupted struct {
39 Pos int64
40 Size int64
41 Kind string
42 Reason string
43 }
44
45 func (e *ErrCorrupted) Error() string {
46 return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
47 }
48
49 func max(x, y int) int {
50 if x > y {
51 return x
52 }
53 return y
54 }
55
56 type block struct {
57 bpool *util.BufferPool
58 bh blockHandle
59 data []byte
60 restartsLen int
61 restartsOffset int
62 }
63
64 func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
65 index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
66 offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
67 offset++
68 v1, n1 := binary.Uvarint(b.data[offset:])
69 _, n2 := binary.Uvarint(b.data[offset+n1:])
70 m := offset + n1 + n2
71 return cmp.Compare(b.data[m:m+int(v1)], key) > 0
72 }) + rstart - 1
73 if index < rstart {
74
75 index = rstart
76 }
77 offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
78 return
79 }
80
81 func (b *block) restartIndex(rstart, rlimit, offset int) int {
82 return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
83 return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
84 }) + rstart - 1
85 }
86
87 func (b *block) restartOffset(index int) int {
88 return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
89 }
90
91 func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
92 if offset >= b.restartsOffset {
93 if offset != b.restartsOffset {
94 err = &ErrCorrupted{Reason: "entries offset not aligned"}
95 }
96 return
97 }
98 v0, n0 := binary.Uvarint(b.data[offset:])
99 v1, n1 := binary.Uvarint(b.data[offset+n0:])
100 v2, n2 := binary.Uvarint(b.data[offset+n0+n1:])
101 m := n0 + n1 + n2
102 n = m + int(v1) + int(v2)
103 if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
104 err = &ErrCorrupted{Reason: "entries corrupted"}
105 return
106 }
107 key = b.data[offset+m : offset+m+int(v1)]
108 value = b.data[offset+m+int(v1) : offset+n]
109 nShared = int(v0)
110 return
111 }
112
113 func (b *block) Release() {
114 b.bpool.Put(b.data)
115 b.bpool = nil
116 b.data = nil
117 }
118
119 type dir int
120
121 const (
122 dirReleased dir = iota - 1
123 dirSOI
124 dirEOI
125 dirBackward
126 dirForward
127 )
128
129 type blockIter struct {
130 tr *Reader
131 block *block
132 blockReleaser util.Releaser
133 releaser util.Releaser
134 key, value []byte
135 offset int
136
137 prevOffset int
138 prevNode []int
139 prevKeys []byte
140 restartIndex int
141
142 dir dir
143
144 riStart int
145 riLimit int
146
147 offsetStart int
148 offsetRealStart int
149 offsetLimit int
150
151 err error
152 }
153
154 func (i *blockIter) sErr(err error) {
155 i.err = err
156 i.key = nil
157 i.value = nil
158 i.prevNode = nil
159 i.prevKeys = nil
160 }
161
162 func (i *blockIter) reset() {
163 if i.dir == dirBackward {
164 i.prevNode = i.prevNode[:0]
165 i.prevKeys = i.prevKeys[:0]
166 }
167 i.restartIndex = i.riStart
168 i.offset = i.offsetStart
169 i.dir = dirSOI
170 i.key = i.key[:0]
171 i.value = nil
172 }
173
174 func (i *blockIter) isFirst() bool {
175 switch i.dir {
176 case dirForward:
177 return i.prevOffset == i.offsetRealStart
178 case dirBackward:
179 return len(i.prevNode) == 1 && i.restartIndex == i.riStart
180 }
181 return false
182 }
183
184 func (i *blockIter) isLast() bool {
185 switch i.dir {
186 case dirForward, dirBackward:
187 return i.offset == i.offsetLimit
188 }
189 return false
190 }
191
192 func (i *blockIter) First() bool {
193 if i.err != nil {
194 return false
195 } else if i.dir == dirReleased {
196 i.err = ErrIterReleased
197 return false
198 }
199
200 if i.dir == dirBackward {
201 i.prevNode = i.prevNode[:0]
202 i.prevKeys = i.prevKeys[:0]
203 }
204 i.dir = dirSOI
205 return i.Next()
206 }
207
208 func (i *blockIter) Last() bool {
209 if i.err != nil {
210 return false
211 } else if i.dir == dirReleased {
212 i.err = ErrIterReleased
213 return false
214 }
215
216 if i.dir == dirBackward {
217 i.prevNode = i.prevNode[:0]
218 i.prevKeys = i.prevKeys[:0]
219 }
220 i.dir = dirEOI
221 return i.Prev()
222 }
223
224 func (i *blockIter) Seek(key []byte) bool {
225 if i.err != nil {
226 return false
227 } else if i.dir == dirReleased {
228 i.err = ErrIterReleased
229 return false
230 }
231
232 ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
233 if err != nil {
234 i.sErr(err)
235 return false
236 }
237 i.restartIndex = ri
238 i.offset = max(i.offsetStart, offset)
239 if i.dir == dirSOI || i.dir == dirEOI {
240 i.dir = dirForward
241 }
242 for i.Next() {
243 if i.tr.cmp.Compare(i.key, key) >= 0 {
244 return true
245 }
246 }
247 return false
248 }
249
250 func (i *blockIter) Next() bool {
251 if i.dir == dirEOI || i.err != nil {
252 return false
253 } else if i.dir == dirReleased {
254 i.err = ErrIterReleased
255 return false
256 }
257
258 if i.dir == dirSOI {
259 i.restartIndex = i.riStart
260 i.offset = i.offsetStart
261 } else if i.dir == dirBackward {
262 i.prevNode = i.prevNode[:0]
263 i.prevKeys = i.prevKeys[:0]
264 }
265 for i.offset < i.offsetRealStart {
266 key, value, nShared, n, err := i.block.entry(i.offset)
267 if err != nil {
268 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
269 return false
270 }
271 if n == 0 {
272 i.dir = dirEOI
273 return false
274 }
275 i.key = append(i.key[:nShared], key...)
276 i.value = value
277 i.offset += n
278 }
279 if i.offset >= i.offsetLimit {
280 i.dir = dirEOI
281 if i.offset != i.offsetLimit {
282 i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
283 }
284 return false
285 }
286 key, value, nShared, n, err := i.block.entry(i.offset)
287 if err != nil {
288 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
289 return false
290 }
291 if n == 0 {
292 i.dir = dirEOI
293 return false
294 }
295 i.key = append(i.key[:nShared], key...)
296 i.value = value
297 i.prevOffset = i.offset
298 i.offset += n
299 i.dir = dirForward
300 return true
301 }
302
303 func (i *blockIter) Prev() bool {
304 if i.dir == dirSOI || i.err != nil {
305 return false
306 } else if i.dir == dirReleased {
307 i.err = ErrIterReleased
308 return false
309 }
310
311 var ri int
312 if i.dir == dirForward {
313
314 i.offset = i.prevOffset
315 if i.offset == i.offsetRealStart {
316 i.dir = dirSOI
317 return false
318 }
319 ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset)
320 i.dir = dirBackward
321 } else if i.dir == dirEOI {
322
323 i.restartIndex = i.riLimit
324 i.offset = i.offsetLimit
325 if i.offset == i.offsetRealStart {
326 i.dir = dirSOI
327 return false
328 }
329 ri = i.riLimit - 1
330 i.dir = dirBackward
331 } else if len(i.prevNode) == 1 {
332
333 i.offset = i.prevNode[0]
334 i.prevNode = i.prevNode[:0]
335 if i.restartIndex == i.riStart {
336 i.dir = dirSOI
337 return false
338 }
339 i.restartIndex--
340 ri = i.restartIndex
341 } else {
342
343 n := len(i.prevNode) - 3
344 node := i.prevNode[n:]
345 i.prevNode = i.prevNode[:n]
346
347 ko := node[0]
348 i.key = append(i.key[:0], i.prevKeys[ko:]...)
349 i.prevKeys = i.prevKeys[:ko]
350
351 vo := node[1]
352 vl := vo + node[2]
353 i.value = i.block.data[vo:vl]
354 i.offset = vl
355 return true
356 }
357
358 i.key = i.key[:0]
359 i.value = nil
360 offset := i.block.restartOffset(ri)
361 if offset == i.offset {
362 ri--
363 if ri < 0 {
364 i.dir = dirSOI
365 return false
366 }
367 offset = i.block.restartOffset(ri)
368 }
369 i.prevNode = append(i.prevNode, offset)
370 for {
371 key, value, nShared, n, err := i.block.entry(offset)
372 if err != nil {
373 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
374 return false
375 }
376 if offset >= i.offsetRealStart {
377 if i.value != nil {
378
379
380
381
382 i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value))
383 i.prevKeys = append(i.prevKeys, i.key...)
384 }
385 i.value = value
386 }
387 i.key = append(i.key[:nShared], key...)
388 offset += n
389
390 if offset >= i.offset {
391 if offset != i.offset {
392 i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
393 return false
394 }
395
396 break
397 }
398 }
399 i.restartIndex = ri
400 i.offset = offset
401 return true
402 }
403
404 func (i *blockIter) Key() []byte {
405 if i.err != nil || i.dir <= dirEOI {
406 return nil
407 }
408 return i.key
409 }
410
411 func (i *blockIter) Value() []byte {
412 if i.err != nil || i.dir <= dirEOI {
413 return nil
414 }
415 return i.value
416 }
417
418 func (i *blockIter) Release() {
419 if i.dir != dirReleased {
420 i.tr = nil
421 i.block = nil
422 i.prevNode = nil
423 i.prevKeys = nil
424 i.key = nil
425 i.value = nil
426 i.dir = dirReleased
427 if i.blockReleaser != nil {
428 i.blockReleaser.Release()
429 i.blockReleaser = nil
430 }
431 if i.releaser != nil {
432 i.releaser.Release()
433 i.releaser = nil
434 }
435 }
436 }
437
438 func (i *blockIter) SetReleaser(releaser util.Releaser) {
439 if i.dir == dirReleased {
440 panic(util.ErrReleased)
441 }
442 if i.releaser != nil && releaser != nil {
443 panic(util.ErrHasReleaser)
444 }
445 i.releaser = releaser
446 }
447
448 func (i *blockIter) Valid() bool {
449 return i.err == nil && (i.dir == dirBackward || i.dir == dirForward)
450 }
451
452 func (i *blockIter) Error() error {
453 return i.err
454 }
455
456 type filterBlock struct {
457 bpool *util.BufferPool
458 data []byte
459 oOffset int
460 baseLg uint
461 filtersNum int
462 }
463
464 func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
465 i := int(offset >> b.baseLg)
466 if i < b.filtersNum {
467 o := b.data[b.oOffset+i*4:]
468 n := int(binary.LittleEndian.Uint32(o))
469 m := int(binary.LittleEndian.Uint32(o[4:]))
470 if n < m && m <= b.oOffset {
471 return filter.Contains(b.data[n:m], key)
472 } else if n == m {
473 return false
474 }
475 }
476 return true
477 }
478
479 func (b *filterBlock) Release() {
480 b.bpool.Put(b.data)
481 b.bpool = nil
482 b.data = nil
483 }
484
485 type indexIter struct {
486 *blockIter
487 tr *Reader
488 slice *util.Range
489
490 fillCache bool
491 }
492
493 func (i *indexIter) Get() iterator.Iterator {
494 value := i.Value()
495 if value == nil {
496 return nil
497 }
498 dataBH, n := decodeBlockHandle(value)
499 if n == 0 {
500 return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
501 }
502
503 var slice *util.Range
504 if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
505 slice = i.slice
506 }
507 return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
508 }
509
510
511 type Reader struct {
512 mu sync.RWMutex
513 fd storage.FileDesc
514 reader io.ReaderAt
515 cache *cache.NamespaceGetter
516 err error
517 bpool *util.BufferPool
518
519 o *opt.Options
520 cmp comparer.Comparer
521 filter filter.Filter
522 verifyChecksum bool
523
524 dataEnd int64
525 metaBH, indexBH, filterBH blockHandle
526 indexBlock *block
527 filterBlock *filterBlock
528 }
529
530 func (r *Reader) blockKind(bh blockHandle) string {
531 switch bh.offset {
532 case r.metaBH.offset:
533 return "meta-block"
534 case r.indexBH.offset:
535 return "index-block"
536 case r.filterBH.offset:
537 if r.filterBH.length > 0 {
538 return "filter-block"
539 }
540 }
541 return "data-block"
542 }
543
544 func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
545 return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
546 }
547
548 func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
549 return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
550 }
551
552 func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
553 if cerr, ok := err.(*ErrCorrupted); ok {
554 cerr.Pos = int64(bh.offset)
555 cerr.Size = int64(bh.length)
556 cerr.Kind = r.blockKind(bh)
557 return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
558 }
559 return err
560 }
561
562 func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
563 data := r.bpool.Get(int(bh.length + blockTrailerLen))
564 if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
565 return nil, err
566 }
567
568 if verifyChecksum {
569 n := bh.length + 1
570 checksum0 := binary.LittleEndian.Uint32(data[n:])
571 checksum1 := util.NewCRC(data[:n]).Value()
572 if checksum0 != checksum1 {
573 r.bpool.Put(data)
574 return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
575 }
576 }
577
578 switch data[bh.length] {
579 case blockTypeNoCompression:
580 data = data[:bh.length]
581 case blockTypeSnappyCompression:
582 decLen, err := snappy.DecodedLen(data[:bh.length])
583 if err != nil {
584 r.bpool.Put(data)
585 return nil, r.newErrCorruptedBH(bh, err.Error())
586 }
587 decData := r.bpool.Get(decLen)
588 decData, err = snappy.Decode(decData, data[:bh.length])
589 r.bpool.Put(data)
590 if err != nil {
591 r.bpool.Put(decData)
592 return nil, r.newErrCorruptedBH(bh, err.Error())
593 }
594 data = decData
595 default:
596 r.bpool.Put(data)
597 return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
598 }
599 return data, nil
600 }
601
602 func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
603 data, err := r.readRawBlock(bh, verifyChecksum)
604 if err != nil {
605 return nil, err
606 }
607 restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
608 b := &block{
609 bpool: r.bpool,
610 bh: bh,
611 data: data,
612 restartsLen: restartsLen,
613 restartsOffset: len(data) - (restartsLen+1)*4,
614 }
615 return b, nil
616 }
617
618 func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
619 if r.cache != nil {
620 var (
621 err error
622 ch *cache.Handle
623 )
624 if fillCache {
625 ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
626 var b *block
627 b, err = r.readBlock(bh, verifyChecksum)
628 if err != nil {
629 return 0, nil
630 }
631 return cap(b.data), b
632 })
633 } else {
634 ch = r.cache.Get(bh.offset, nil)
635 }
636 if ch != nil {
637 b, ok := ch.Value().(*block)
638 if !ok {
639 ch.Release()
640 return nil, nil, errors.New("leveldb/table: inconsistent block type")
641 }
642 return b, ch, err
643 } else if err != nil {
644 return nil, nil, err
645 }
646 }
647
648 b, err := r.readBlock(bh, verifyChecksum)
649 return b, b, err
650 }
651
652 func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
653 data, err := r.readRawBlock(bh, true)
654 if err != nil {
655 return nil, err
656 }
657 n := len(data)
658 if n < 5 {
659 return nil, r.newErrCorruptedBH(bh, "too short")
660 }
661 m := n - 5
662 oOffset := int(binary.LittleEndian.Uint32(data[m:]))
663 if oOffset > m {
664 return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
665 }
666 b := &filterBlock{
667 bpool: r.bpool,
668 data: data,
669 oOffset: oOffset,
670 baseLg: uint(data[n-1]),
671 filtersNum: (m - oOffset) / 4,
672 }
673 return b, nil
674 }
675
676 func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
677 if r.cache != nil {
678 var (
679 err error
680 ch *cache.Handle
681 )
682 if fillCache {
683 ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
684 var b *filterBlock
685 b, err = r.readFilterBlock(bh)
686 if err != nil {
687 return 0, nil
688 }
689 return cap(b.data), b
690 })
691 } else {
692 ch = r.cache.Get(bh.offset, nil)
693 }
694 if ch != nil {
695 b, ok := ch.Value().(*filterBlock)
696 if !ok {
697 ch.Release()
698 return nil, nil, errors.New("leveldb/table: inconsistent block type")
699 }
700 return b, ch, err
701 } else if err != nil {
702 return nil, nil, err
703 }
704 }
705
706 b, err := r.readFilterBlock(bh)
707 return b, b, err
708 }
709
710 func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
711 if r.indexBlock == nil {
712 return r.readBlockCached(r.indexBH, true, fillCache)
713 }
714 return r.indexBlock, util.NoopReleaser{}, nil
715 }
716
717 func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
718 if r.filterBlock == nil {
719 return r.readFilterBlockCached(r.filterBH, fillCache)
720 }
721 return r.filterBlock, util.NoopReleaser{}, nil
722 }
723
724 func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
725 bi := &blockIter{
726 tr: r,
727 block: b,
728 blockReleaser: bReleaser,
729
730 key: make([]byte, 0),
731 dir: dirSOI,
732 riStart: 0,
733 riLimit: b.restartsLen,
734 offsetStart: 0,
735 offsetRealStart: 0,
736 offsetLimit: b.restartsOffset,
737 }
738 if slice != nil {
739 if slice.Start != nil {
740 if bi.Seek(slice.Start) {
741 bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
742 bi.offsetStart = b.restartOffset(bi.riStart)
743 bi.offsetRealStart = bi.prevOffset
744 } else {
745 bi.riStart = b.restartsLen
746 bi.offsetStart = b.restartsOffset
747 bi.offsetRealStart = b.restartsOffset
748 }
749 }
750 if slice.Limit != nil {
751 if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
752 bi.offsetLimit = bi.prevOffset
753 bi.riLimit = bi.restartIndex + 1
754 }
755 }
756 bi.reset()
757 if bi.offsetStart > bi.offsetLimit {
758 bi.sErr(errors.New("leveldb/table: invalid slice range"))
759 }
760 }
761 return bi
762 }
763
764 func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
765 b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
766 if err != nil {
767 return iterator.NewEmptyIterator(err)
768 }
769 return r.newBlockIter(b, rel, slice, false)
770 }
771
772 func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
773 r.mu.RLock()
774 defer r.mu.RUnlock()
775
776 if r.err != nil {
777 return iterator.NewEmptyIterator(r.err)
778 }
779
780 return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
781 }
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798 func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
799 r.mu.RLock()
800 defer r.mu.RUnlock()
801
802 if r.err != nil {
803 return iterator.NewEmptyIterator(r.err)
804 }
805
806 fillCache := !ro.GetDontFillCache()
807 indexBlock, rel, err := r.getIndexBlock(fillCache)
808 if err != nil {
809 return iterator.NewEmptyIterator(err)
810 }
811 index := &indexIter{
812 blockIter: r.newBlockIter(indexBlock, rel, slice, true),
813 tr: r,
814 slice: slice,
815 fillCache: !ro.GetDontFillCache(),
816 }
817 return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
818 }
819
820 func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
821 r.mu.RLock()
822 defer r.mu.RUnlock()
823
824 if r.err != nil {
825 err = r.err
826 return
827 }
828
829 indexBlock, rel, err := r.getIndexBlock(true)
830 if err != nil {
831 return
832 }
833 defer rel.Release()
834
835 index := r.newBlockIter(indexBlock, nil, nil, true)
836 defer index.Release()
837
838 if !index.Seek(key) {
839 if err = index.Error(); err == nil {
840 err = ErrNotFound
841 }
842 return
843 }
844
845 dataBH, n := decodeBlockHandle(index.Value())
846 if n == 0 {
847 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
848 return nil, nil, r.err
849 }
850
851
852 if filtered && r.filter != nil {
853 filterBlock, frel, ferr := r.getFilterBlock(true)
854 if ferr == nil {
855 if !filterBlock.contains(r.filter, dataBH.offset, key) {
856 frel.Release()
857 return nil, nil, ErrNotFound
858 }
859 frel.Release()
860 } else if !errors.IsCorrupted(ferr) {
861 return nil, nil, ferr
862 }
863 }
864
865 data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
866 if !data.Seek(key) {
867 data.Release()
868 if err = data.Error(); err != nil {
869 return
870 }
871
872
873 if !index.Next() {
874 if err = index.Error(); err == nil {
875 err = ErrNotFound
876 }
877 return
878 }
879
880 dataBH, n = decodeBlockHandle(index.Value())
881 if n == 0 {
882 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
883 return nil, nil, r.err
884 }
885
886 data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
887 if !data.Next() {
888 data.Release()
889 if err = data.Error(); err == nil {
890 err = ErrNotFound
891 }
892 return
893 }
894 }
895
896
897 rkey = data.Key()
898 if !noValue {
899 if r.bpool == nil {
900 value = data.Value()
901 } else {
902
903
904 value = append([]byte(nil), data.Value()...)
905 }
906 }
907 data.Release()
908 return
909 }
910
911
912
913
914
915
916
917
918
919
920
921 func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
922 return r.find(key, filtered, ro, false)
923 }
924
925
926
927
928
929
930
931
932
933
934 func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
935 rkey, _, err = r.find(key, filtered, ro, true)
936 return
937 }
938
939
940
941
942
943
944
945 func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
946 r.mu.RLock()
947 defer r.mu.RUnlock()
948
949 if r.err != nil {
950 err = r.err
951 return
952 }
953
954 rkey, value, err := r.find(key, false, ro, false)
955 if err == nil && r.cmp.Compare(rkey, key) != 0 {
956 value = nil
957 err = ErrNotFound
958 }
959 return
960 }
961
962
963
964
965 func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
966 r.mu.RLock()
967 defer r.mu.RUnlock()
968
969 if r.err != nil {
970 err = r.err
971 return
972 }
973
974 indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
975 if err != nil {
976 return
977 }
978 defer rel.Release()
979
980 index := r.newBlockIter(indexBlock, nil, nil, true)
981 defer index.Release()
982 if index.Seek(key) {
983 dataBH, n := decodeBlockHandle(index.Value())
984 if n == 0 {
985 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
986 return
987 }
988 offset = int64(dataBH.offset)
989 return
990 }
991 err = index.Error()
992 if err == nil {
993 offset = r.dataEnd
994 }
995 return
996 }
997
998
999
1000 func (r *Reader) Release() {
1001 r.mu.Lock()
1002 defer r.mu.Unlock()
1003
1004 if closer, ok := r.reader.(io.Closer); ok {
1005 closer.Close()
1006 }
1007 if r.indexBlock != nil {
1008 r.indexBlock.Release()
1009 r.indexBlock = nil
1010 }
1011 if r.filterBlock != nil {
1012 r.filterBlock.Release()
1013 r.filterBlock = nil
1014 }
1015 r.reader = nil
1016 r.cache = nil
1017 r.bpool = nil
1018 r.err = ErrReaderReleased
1019 }
1020
1021
1022
1023
1024
1025 func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
1026 if f == nil {
1027 return nil, errors.New("leveldb/table: nil file")
1028 }
1029
1030 r := &Reader{
1031 fd: fd,
1032 reader: f,
1033 cache: cache,
1034 bpool: bpool,
1035 o: o,
1036 cmp: o.GetComparer(),
1037 verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
1038 }
1039
1040 if size < footerLen {
1041 r.err = r.newErrCorrupted(0, size, "table", "too small")
1042 return r, nil
1043 }
1044
1045 footerPos := size - footerLen
1046 var footer [footerLen]byte
1047 if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
1048 return nil, err
1049 }
1050 if string(footer[footerLen-len(magic):footerLen]) != magic {
1051 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
1052 return r, nil
1053 }
1054
1055 var n int
1056
1057 r.metaBH, n = decodeBlockHandle(footer[:])
1058 if n == 0 {
1059 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
1060 return r, nil
1061 }
1062
1063
1064 r.indexBH, n = decodeBlockHandle(footer[n:])
1065 if n == 0 {
1066 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
1067 return r, nil
1068 }
1069
1070
1071 metaBlock, err := r.readBlock(r.metaBH, true)
1072 if err != nil {
1073 if errors.IsCorrupted(err) {
1074 r.err = err
1075 return r, nil
1076 }
1077 return nil, err
1078 }
1079
1080
1081 r.dataEnd = int64(r.metaBH.offset)
1082
1083
1084 metaIter := r.newBlockIter(metaBlock, nil, nil, true)
1085 for metaIter.Next() {
1086 key := string(metaIter.Key())
1087 if !strings.HasPrefix(key, "filter.") {
1088 continue
1089 }
1090 fn := key[7:]
1091 if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
1092 r.filter = f0
1093 } else {
1094 for _, f0 := range o.GetAltFilters() {
1095 if f0.Name() == fn {
1096 r.filter = f0
1097 break
1098 }
1099 }
1100 }
1101 if r.filter != nil {
1102 filterBH, n := decodeBlockHandle(metaIter.Value())
1103 if n == 0 {
1104 continue
1105 }
1106 r.filterBH = filterBH
1107
1108 r.dataEnd = int64(filterBH.offset)
1109 break
1110 }
1111 }
1112 metaIter.Release()
1113 metaBlock.Release()
1114
1115
1116 if cache == nil {
1117 r.indexBlock, err = r.readBlock(r.indexBH, true)
1118 if err != nil {
1119 if errors.IsCorrupted(err) {
1120 r.err = err
1121 return r, nil
1122 }
1123 return nil, err
1124 }
1125 if r.filter != nil {
1126 r.filterBlock, err = r.readFilterBlock(r.filterBH)
1127 if err != nil {
1128 if !errors.IsCorrupted(err) {
1129 return nil, err
1130 }
1131
1132
1133 r.filter = nil
1134 }
1135 }
1136 }
1137
1138 return r, nil
1139 }
1140
View as plain text