...

Source file src/github.com/syndtr/goleveldb/leveldb/db_iter.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"math/rand"
    11  	"runtime"
    12  	"sync"
    13  	"sync/atomic"
    14  
    15  	"github.com/syndtr/goleveldb/leveldb/iterator"
    16  	"github.com/syndtr/goleveldb/leveldb/opt"
    17  	"github.com/syndtr/goleveldb/leveldb/util"
    18  )
    19  
    20  type memdbReleaser struct {
    21  	once sync.Once
    22  	m    *memDB
    23  }
    24  
    25  func (mr *memdbReleaser) Release() {
    26  	mr.once.Do(func() {
    27  		mr.m.decref()
    28  	})
    29  }
    30  
    31  func (db *DB) newRawIterator(auxm *memDB, auxt tFiles, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
    32  	strict := opt.GetStrict(db.s.o.Options, ro, opt.StrictReader)
    33  	em, fm := db.getMems()
    34  	v := db.s.version()
    35  
    36  	tableIts := v.getIterators(slice, ro)
    37  	n := len(tableIts) + len(auxt) + 3
    38  	its := make([]iterator.Iterator, 0, n)
    39  
    40  	if auxm != nil {
    41  		ami := auxm.NewIterator(slice)
    42  		ami.SetReleaser(&memdbReleaser{m: auxm})
    43  		its = append(its, ami)
    44  	}
    45  	for _, t := range auxt {
    46  		its = append(its, v.s.tops.newIterator(t, slice, ro))
    47  	}
    48  
    49  	emi := em.NewIterator(slice)
    50  	emi.SetReleaser(&memdbReleaser{m: em})
    51  	its = append(its, emi)
    52  	if fm != nil {
    53  		fmi := fm.NewIterator(slice)
    54  		fmi.SetReleaser(&memdbReleaser{m: fm})
    55  		its = append(its, fmi)
    56  	}
    57  	its = append(its, tableIts...)
    58  	mi := iterator.NewMergedIterator(its, db.s.icmp, strict)
    59  	mi.SetReleaser(&versionReleaser{v: v})
    60  	return mi
    61  }
    62  
    63  func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter {
    64  	var islice *util.Range
    65  	if slice != nil {
    66  		islice = &util.Range{}
    67  		if slice.Start != nil {
    68  			islice.Start = makeInternalKey(nil, slice.Start, keyMaxSeq, keyTypeSeek)
    69  		}
    70  		if slice.Limit != nil {
    71  			islice.Limit = makeInternalKey(nil, slice.Limit, keyMaxSeq, keyTypeSeek)
    72  		}
    73  	}
    74  	rawIter := db.newRawIterator(auxm, auxt, islice, ro)
    75  	iter := &dbIter{
    76  		db:              db,
    77  		icmp:            db.s.icmp,
    78  		iter:            rawIter,
    79  		seq:             seq,
    80  		strict:          opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
    81  		disableSampling: db.s.o.GetDisableSeeksCompaction() || db.s.o.GetIteratorSamplingRate() <= 0,
    82  		key:             make([]byte, 0),
    83  		value:           make([]byte, 0),
    84  	}
    85  	if !iter.disableSampling {
    86  		iter.samplingGap = db.iterSamplingRate()
    87  	}
    88  	atomic.AddInt32(&db.aliveIters, 1)
    89  	runtime.SetFinalizer(iter, (*dbIter).Release)
    90  	return iter
    91  }
    92  
    93  func (db *DB) iterSamplingRate() int {
    94  	return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
    95  }
    96  
    97  type dir int
    98  
    99  const (
   100  	dirReleased dir = iota - 1
   101  	dirSOI
   102  	dirEOI
   103  	dirBackward
   104  	dirForward
   105  )
   106  
   107  // dbIter represent an interator states over a database session.
   108  type dbIter struct {
   109  	db              *DB
   110  	icmp            *iComparer
   111  	iter            iterator.Iterator
   112  	seq             uint64
   113  	strict          bool
   114  	disableSampling bool
   115  
   116  	samplingGap int
   117  	dir         dir
   118  	key         []byte
   119  	value       []byte
   120  	err         error
   121  	releaser    util.Releaser
   122  }
   123  
   124  func (i *dbIter) sampleSeek() {
   125  	if i.disableSampling {
   126  		return
   127  	}
   128  
   129  	ikey := i.iter.Key()
   130  	i.samplingGap -= len(ikey) + len(i.iter.Value())
   131  	for i.samplingGap < 0 {
   132  		i.samplingGap += i.db.iterSamplingRate()
   133  		i.db.sampleSeek(ikey)
   134  	}
   135  }
   136  
   137  func (i *dbIter) setErr(err error) {
   138  	i.err = err
   139  	i.key = nil
   140  	i.value = nil
   141  }
   142  
   143  func (i *dbIter) iterErr() {
   144  	if err := i.iter.Error(); err != nil {
   145  		i.setErr(err)
   146  	}
   147  }
   148  
   149  func (i *dbIter) Valid() bool {
   150  	return i.err == nil && i.dir > dirEOI
   151  }
   152  
   153  func (i *dbIter) First() bool {
   154  	if i.err != nil {
   155  		return false
   156  	} else if i.dir == dirReleased {
   157  		i.err = ErrIterReleased
   158  		return false
   159  	}
   160  
   161  	if i.iter.First() {
   162  		i.dir = dirSOI
   163  		return i.next()
   164  	}
   165  	i.dir = dirEOI
   166  	i.iterErr()
   167  	return false
   168  }
   169  
   170  func (i *dbIter) Last() bool {
   171  	if i.err != nil {
   172  		return false
   173  	} else if i.dir == dirReleased {
   174  		i.err = ErrIterReleased
   175  		return false
   176  	}
   177  
   178  	if i.iter.Last() {
   179  		return i.prev()
   180  	}
   181  	i.dir = dirSOI
   182  	i.iterErr()
   183  	return false
   184  }
   185  
   186  func (i *dbIter) Seek(key []byte) bool {
   187  	if i.err != nil {
   188  		return false
   189  	} else if i.dir == dirReleased {
   190  		i.err = ErrIterReleased
   191  		return false
   192  	}
   193  
   194  	ikey := makeInternalKey(nil, key, i.seq, keyTypeSeek)
   195  	if i.iter.Seek(ikey) {
   196  		i.dir = dirSOI
   197  		return i.next()
   198  	}
   199  	i.dir = dirEOI
   200  	i.iterErr()
   201  	return false
   202  }
   203  
   204  func (i *dbIter) next() bool {
   205  	for {
   206  		if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
   207  			i.sampleSeek()
   208  			if seq <= i.seq {
   209  				switch kt {
   210  				case keyTypeDel:
   211  					// Skip deleted key.
   212  					i.key = append(i.key[:0], ukey...)
   213  					i.dir = dirForward
   214  				case keyTypeVal:
   215  					if i.dir == dirSOI || i.icmp.uCompare(ukey, i.key) > 0 {
   216  						i.key = append(i.key[:0], ukey...)
   217  						i.value = append(i.value[:0], i.iter.Value()...)
   218  						i.dir = dirForward
   219  						return true
   220  					}
   221  				}
   222  			}
   223  		} else if i.strict {
   224  			i.setErr(kerr)
   225  			break
   226  		}
   227  		if !i.iter.Next() {
   228  			i.dir = dirEOI
   229  			i.iterErr()
   230  			break
   231  		}
   232  	}
   233  	return false
   234  }
   235  
   236  func (i *dbIter) Next() bool {
   237  	if i.dir == dirEOI || i.err != nil {
   238  		return false
   239  	} else if i.dir == dirReleased {
   240  		i.err = ErrIterReleased
   241  		return false
   242  	}
   243  
   244  	if !i.iter.Next() || (i.dir == dirBackward && !i.iter.Next()) {
   245  		i.dir = dirEOI
   246  		i.iterErr()
   247  		return false
   248  	}
   249  	return i.next()
   250  }
   251  
   252  func (i *dbIter) prev() bool {
   253  	i.dir = dirBackward
   254  	del := true
   255  	if i.iter.Valid() {
   256  		for {
   257  			if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
   258  				i.sampleSeek()
   259  				if seq <= i.seq {
   260  					if !del && i.icmp.uCompare(ukey, i.key) < 0 {
   261  						return true
   262  					}
   263  					del = (kt == keyTypeDel)
   264  					if !del {
   265  						i.key = append(i.key[:0], ukey...)
   266  						i.value = append(i.value[:0], i.iter.Value()...)
   267  					}
   268  				}
   269  			} else if i.strict {
   270  				i.setErr(kerr)
   271  				return false
   272  			}
   273  			if !i.iter.Prev() {
   274  				break
   275  			}
   276  		}
   277  	}
   278  	if del {
   279  		i.dir = dirSOI
   280  		i.iterErr()
   281  		return false
   282  	}
   283  	return true
   284  }
   285  
   286  func (i *dbIter) Prev() bool {
   287  	if i.dir == dirSOI || i.err != nil {
   288  		return false
   289  	} else if i.dir == dirReleased {
   290  		i.err = ErrIterReleased
   291  		return false
   292  	}
   293  
   294  	switch i.dir {
   295  	case dirEOI:
   296  		return i.Last()
   297  	case dirForward:
   298  		for i.iter.Prev() {
   299  			if ukey, _, _, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
   300  				i.sampleSeek()
   301  				if i.icmp.uCompare(ukey, i.key) < 0 {
   302  					goto cont
   303  				}
   304  			} else if i.strict {
   305  				i.setErr(kerr)
   306  				return false
   307  			}
   308  		}
   309  		i.dir = dirSOI
   310  		i.iterErr()
   311  		return false
   312  	}
   313  
   314  cont:
   315  	return i.prev()
   316  }
   317  
   318  func (i *dbIter) Key() []byte {
   319  	if i.err != nil || i.dir <= dirEOI {
   320  		return nil
   321  	}
   322  	return i.key
   323  }
   324  
   325  func (i *dbIter) Value() []byte {
   326  	if i.err != nil || i.dir <= dirEOI {
   327  		return nil
   328  	}
   329  	return i.value
   330  }
   331  
   332  func (i *dbIter) Release() {
   333  	if i.dir != dirReleased {
   334  		// Clear the finalizer.
   335  		runtime.SetFinalizer(i, nil)
   336  
   337  		if i.releaser != nil {
   338  			i.releaser.Release()
   339  			i.releaser = nil
   340  		}
   341  
   342  		i.dir = dirReleased
   343  		i.key = nil
   344  		i.value = nil
   345  		i.iter.Release()
   346  		i.iter = nil
   347  		atomic.AddInt32(&i.db.aliveIters, -1)
   348  		i.db = nil
   349  	}
   350  }
   351  
   352  func (i *dbIter) SetReleaser(releaser util.Releaser) {
   353  	if i.dir == dirReleased {
   354  		panic(util.ErrReleased)
   355  	}
   356  	if i.releaser != nil && releaser != nil {
   357  		panic(util.ErrHasReleaser)
   358  	}
   359  	i.releaser = releaser
   360  }
   361  
   362  func (i *dbIter) Error() error {
   363  	return i.err
   364  }
   365  

View as plain text