...

Source file src/github.com/syndtr/goleveldb/leveldb/table.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"bytes"
    11  	"fmt"
    12  	"sort"
    13  	"sync/atomic"
    14  
    15  	"github.com/syndtr/goleveldb/leveldb/cache"
    16  	"github.com/syndtr/goleveldb/leveldb/iterator"
    17  	"github.com/syndtr/goleveldb/leveldb/opt"
    18  	"github.com/syndtr/goleveldb/leveldb/storage"
    19  	"github.com/syndtr/goleveldb/leveldb/table"
    20  	"github.com/syndtr/goleveldb/leveldb/util"
    21  )
    22  
    23  // tFile holds basic information about a table.
    24  type tFile struct {
    25  	fd         storage.FileDesc
    26  	seekLeft   int32
    27  	size       int64
    28  	imin, imax internalKey
    29  }
    30  
    31  // Returns true if given key is after largest key of this table.
    32  func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
    33  	return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
    34  }
    35  
    36  // Returns true if given key is before smallest key of this table.
    37  func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
    38  	return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
    39  }
    40  
    41  // Returns true if given key range overlaps with this table key range.
    42  func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
    43  	return !t.after(icmp, umin) && !t.before(icmp, umax)
    44  }
    45  
    46  // Cosumes one seek and return current seeks left.
    47  func (t *tFile) consumeSeek() int32 {
    48  	return atomic.AddInt32(&t.seekLeft, -1)
    49  }
    50  
    51  // Creates new tFile.
    52  func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
    53  	f := &tFile{
    54  		fd:   fd,
    55  		size: size,
    56  		imin: imin,
    57  		imax: imax,
    58  	}
    59  
    60  	// We arrange to automatically compact this file after
    61  	// a certain number of seeks.  Let's assume:
    62  	//   (1) One seek costs 10ms
    63  	//   (2) Writing or reading 1MB costs 10ms (100MB/s)
    64  	//   (3) A compaction of 1MB does 25MB of IO:
    65  	//         1MB read from this level
    66  	//         10-12MB read from next level (boundaries may be misaligned)
    67  	//         10-12MB written to next level
    68  	// This implies that 25 seeks cost the same as the compaction
    69  	// of 1MB of data.  I.e., one seek costs approximately the
    70  	// same as the compaction of 40KB of data.  We are a little
    71  	// conservative and allow approximately one seek for every 16KB
    72  	// of data before triggering a compaction.
    73  	f.seekLeft = int32(size / 16384)
    74  	if f.seekLeft < 100 {
    75  		f.seekLeft = 100
    76  	}
    77  
    78  	return f
    79  }
    80  
    81  func tableFileFromRecord(r atRecord) *tFile {
    82  	return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
    83  }
    84  
    85  // tFiles hold multiple tFile.
    86  type tFiles []*tFile
    87  
    88  func (tf tFiles) Len() int      { return len(tf) }
    89  func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
    90  
    91  // Returns true if i smallest key is less than j.
    92  // This used for sort by key in ascending order.
    93  func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
    94  	a, b := tf[i], tf[j]
    95  	n := icmp.Compare(a.imin, b.imin)
    96  	if n == 0 {
    97  		return a.fd.Num < b.fd.Num
    98  	}
    99  	return n < 0
   100  }
   101  
   102  // Returns true if i file number is greater than j.
   103  // This used for sort by file number in descending order.
   104  func (tf tFiles) lessByNum(i, j int) bool {
   105  	return tf[i].fd.Num > tf[j].fd.Num
   106  }
   107  
   108  // Sorts tables by key in ascending order.
   109  func (tf tFiles) sortByKey(icmp *iComparer) {
   110  	sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
   111  }
   112  
   113  // Sorts tables by file number in descending order.
   114  func (tf tFiles) sortByNum() {
   115  	sort.Sort(&tFilesSortByNum{tFiles: tf})
   116  }
   117  
   118  // Returns sum of all tables size.
   119  func (tf tFiles) size() (sum int64) {
   120  	for _, t := range tf {
   121  		sum += t.size
   122  	}
   123  	return sum
   124  }
   125  
   126  // Searches smallest index of tables whose its smallest
   127  // key is after or equal with given key.
   128  func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
   129  	return sort.Search(len(tf), func(i int) bool {
   130  		return icmp.Compare(tf[i].imin, ikey) >= 0
   131  	})
   132  }
   133  
   134  // Searches smallest index of tables whose its largest
   135  // key is after or equal with given key.
   136  func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
   137  	return sort.Search(len(tf), func(i int) bool {
   138  		return icmp.Compare(tf[i].imax, ikey) >= 0
   139  	})
   140  }
   141  
   142  // Searches smallest index of tables whose its file number
   143  // is smaller than the given number.
   144  func (tf tFiles) searchNumLess(num int64) int {
   145  	return sort.Search(len(tf), func(i int) bool {
   146  		return tf[i].fd.Num < num
   147  	})
   148  }
   149  
   150  // Searches smallest index of tables whose its smallest
   151  // key is after the given key.
   152  func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int {
   153  	return sort.Search(len(tf), func(i int) bool {
   154  		return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0
   155  	})
   156  }
   157  
   158  // Searches smallest index of tables whose its largest
   159  // key is after the given key.
   160  func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int {
   161  	return sort.Search(len(tf), func(i int) bool {
   162  		return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0
   163  	})
   164  }
   165  
   166  // Returns true if given key range overlaps with one or more
   167  // tables key range. If unsorted is true then binary search will not be used.
   168  func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
   169  	if unsorted {
   170  		// Check against all files.
   171  		for _, t := range tf {
   172  			if t.overlaps(icmp, umin, umax) {
   173  				return true
   174  			}
   175  		}
   176  		return false
   177  	}
   178  
   179  	i := 0
   180  	if len(umin) > 0 {
   181  		// Find the earliest possible internal key for min.
   182  		i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
   183  	}
   184  	if i >= len(tf) {
   185  		// Beginning of range is after all files, so no overlap.
   186  		return false
   187  	}
   188  	return !tf[i].before(icmp, umax)
   189  }
   190  
   191  // Returns tables whose its key range overlaps with given key range.
   192  // Range will be expanded if ukey found hop across tables.
   193  // If overlapped is true then the search will be restarted if umax
   194  // expanded.
   195  // The dst content will be overwritten.
   196  func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
   197  	// Short circuit if tf is empty
   198  	if len(tf) == 0 {
   199  		return nil
   200  	}
   201  	// For non-zero levels, there is no ukey hop across at all.
   202  	// And what's more, the files in these levels are strictly sorted,
   203  	// so use binary search instead of heavy traverse.
   204  	if !overlapped {
   205  		var begin, end int
   206  		// Determine the begin index of the overlapped file
   207  		if umin != nil {
   208  			index := tf.searchMinUkey(icmp, umin)
   209  			if index == 0 {
   210  				begin = 0
   211  			} else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 {
   212  				// The min ukey overlaps with the index-1 file, expand it.
   213  				begin = index - 1
   214  			} else {
   215  				begin = index
   216  			}
   217  		}
   218  		// Determine the end index of the overlapped file
   219  		if umax != nil {
   220  			index := tf.searchMaxUkey(icmp, umax)
   221  			if index == len(tf) {
   222  				end = len(tf)
   223  			} else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 {
   224  				// The max ukey overlaps with the index file, expand it.
   225  				end = index + 1
   226  			} else {
   227  				end = index
   228  			}
   229  		} else {
   230  			end = len(tf)
   231  		}
   232  		// Ensure the overlapped file indexes are valid.
   233  		if begin >= end {
   234  			return nil
   235  		}
   236  		dst = make([]*tFile, end-begin)
   237  		copy(dst, tf[begin:end])
   238  		return dst
   239  	}
   240  
   241  	dst = dst[:0]
   242  	for i := 0; i < len(tf); {
   243  		t := tf[i]
   244  		if t.overlaps(icmp, umin, umax) {
   245  			if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
   246  				umin = t.imin.ukey()
   247  				dst = dst[:0]
   248  				i = 0
   249  				continue
   250  			} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
   251  				umax = t.imax.ukey()
   252  				// Restart search if it is overlapped.
   253  				dst = dst[:0]
   254  				i = 0
   255  				continue
   256  			}
   257  
   258  			dst = append(dst, t)
   259  		}
   260  		i++
   261  	}
   262  
   263  	return dst
   264  }
   265  
   266  // Returns tables key range.
   267  func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
   268  	for i, t := range tf {
   269  		if i == 0 {
   270  			imin, imax = t.imin, t.imax
   271  			continue
   272  		}
   273  		if icmp.Compare(t.imin, imin) < 0 {
   274  			imin = t.imin
   275  		}
   276  		if icmp.Compare(t.imax, imax) > 0 {
   277  			imax = t.imax
   278  		}
   279  	}
   280  
   281  	return
   282  }
   283  
   284  // Creates iterator index from tables.
   285  func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
   286  	if slice != nil {
   287  		var start, limit int
   288  		if slice.Start != nil {
   289  			start = tf.searchMax(icmp, internalKey(slice.Start))
   290  		}
   291  		if slice.Limit != nil {
   292  			limit = tf.searchMin(icmp, internalKey(slice.Limit))
   293  		} else {
   294  			limit = tf.Len()
   295  		}
   296  		tf = tf[start:limit]
   297  	}
   298  	return iterator.NewArrayIndexer(&tFilesArrayIndexer{
   299  		tFiles: tf,
   300  		tops:   tops,
   301  		icmp:   icmp,
   302  		slice:  slice,
   303  		ro:     ro,
   304  	})
   305  }
   306  
   307  // Tables iterator index.
   308  type tFilesArrayIndexer struct {
   309  	tFiles
   310  	tops  *tOps
   311  	icmp  *iComparer
   312  	slice *util.Range
   313  	ro    *opt.ReadOptions
   314  }
   315  
   316  func (a *tFilesArrayIndexer) Search(key []byte) int {
   317  	return a.searchMax(a.icmp, internalKey(key))
   318  }
   319  
   320  func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
   321  	if i == 0 || i == a.Len()-1 {
   322  		return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
   323  	}
   324  	return a.tops.newIterator(a.tFiles[i], nil, a.ro)
   325  }
   326  
   327  // Helper type for sortByKey.
   328  type tFilesSortByKey struct {
   329  	tFiles
   330  	icmp *iComparer
   331  }
   332  
   333  func (x *tFilesSortByKey) Less(i, j int) bool {
   334  	return x.lessByKey(x.icmp, i, j)
   335  }
   336  
   337  // Helper type for sortByNum.
   338  type tFilesSortByNum struct {
   339  	tFiles
   340  }
   341  
   342  func (x *tFilesSortByNum) Less(i, j int) bool {
   343  	return x.lessByNum(i, j)
   344  }
   345  
   346  // Table operations.
   347  type tOps struct {
   348  	s            *session
   349  	noSync       bool
   350  	evictRemoved bool
   351  	fileCache    *cache.Cache
   352  	blockCache   *cache.Cache
   353  	blockBuffer  *util.BufferPool
   354  }
   355  
   356  // Creates an empty table and returns table writer.
   357  func (t *tOps) create(tSize int) (*tWriter, error) {
   358  	fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
   359  	fw, err := t.s.stor.Create(fd)
   360  	if err != nil {
   361  		return nil, err
   362  	}
   363  	return &tWriter{
   364  		t:  t,
   365  		fd: fd,
   366  		w:  fw,
   367  		tw: table.NewWriter(fw, t.s.o.Options, t.blockBuffer, tSize),
   368  	}, nil
   369  }
   370  
   371  // Builds table from src iterator.
   372  func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
   373  	w, err := t.create(0)
   374  	if err != nil {
   375  		return
   376  	}
   377  
   378  	defer func() {
   379  		if err != nil {
   380  			if derr := w.drop(); derr != nil {
   381  				err = fmt.Errorf("error createFrom (%v); error dropping (%v)", err, derr)
   382  			}
   383  		}
   384  	}()
   385  
   386  	for src.Next() {
   387  		err = w.append(src.Key(), src.Value())
   388  		if err != nil {
   389  			return
   390  		}
   391  	}
   392  	err = src.Error()
   393  	if err != nil {
   394  		return
   395  	}
   396  
   397  	n = w.tw.EntriesLen()
   398  	f, err = w.finish()
   399  	return
   400  }
   401  
   402  // Opens table. It returns a cache handle, which should
   403  // be released after use.
   404  func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
   405  	ch = t.fileCache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
   406  		var r storage.Reader
   407  		r, err = t.s.stor.Open(f.fd)
   408  		if err != nil {
   409  			return 0, nil
   410  		}
   411  
   412  		var blockCache *cache.NamespaceGetter
   413  		if t.blockCache != nil {
   414  			blockCache = &cache.NamespaceGetter{Cache: t.blockCache, NS: uint64(f.fd.Num)}
   415  		}
   416  
   417  		var tr *table.Reader
   418  		tr, err = table.NewReader(r, f.size, f.fd, blockCache, t.blockBuffer, t.s.o.Options)
   419  		if err != nil {
   420  			_ = r.Close()
   421  			return 0, nil
   422  		}
   423  		return 1, tr
   424  
   425  	})
   426  	if ch == nil && err == nil {
   427  		err = ErrClosed
   428  	}
   429  	return
   430  }
   431  
   432  // Finds key/value pair whose key is greater than or equal to the
   433  // given key.
   434  func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
   435  	ch, err := t.open(f)
   436  	if err != nil {
   437  		return nil, nil, err
   438  	}
   439  	defer ch.Release()
   440  	return ch.Value().(*table.Reader).Find(key, true, ro)
   441  }
   442  
   443  // Finds key that is greater than or equal to the given key.
   444  func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
   445  	ch, err := t.open(f)
   446  	if err != nil {
   447  		return nil, err
   448  	}
   449  	defer ch.Release()
   450  	return ch.Value().(*table.Reader).FindKey(key, true, ro)
   451  }
   452  
   453  // Returns approximate offset of the given key.
   454  func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
   455  	ch, err := t.open(f)
   456  	if err != nil {
   457  		return
   458  	}
   459  	defer ch.Release()
   460  	return ch.Value().(*table.Reader).OffsetOf(key)
   461  }
   462  
   463  // Creates an iterator from the given table.
   464  func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
   465  	ch, err := t.open(f)
   466  	if err != nil {
   467  		return iterator.NewEmptyIterator(err)
   468  	}
   469  	iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
   470  	iter.SetReleaser(ch)
   471  	return iter
   472  }
   473  
   474  // Removes table from persistent storage. It waits until
   475  // no one use the the table.
   476  func (t *tOps) remove(fd storage.FileDesc) {
   477  	t.fileCache.Delete(0, uint64(fd.Num), func() {
   478  		if err := t.s.stor.Remove(fd); err != nil {
   479  			t.s.logf("table@remove removing @%d %q", fd.Num, err)
   480  		} else {
   481  			t.s.logf("table@remove removed @%d", fd.Num)
   482  		}
   483  		if t.evictRemoved && t.blockCache != nil {
   484  			t.blockCache.EvictNS(uint64(fd.Num))
   485  		}
   486  		// Try to reuse file num, useful for discarded transaction.
   487  		t.s.reuseFileNum(fd.Num)
   488  	})
   489  }
   490  
   491  // Closes the table ops instance. It will close all tables,
   492  // regadless still used or not.
   493  func (t *tOps) close() {
   494  	t.fileCache.Close(true)
   495  	if t.blockCache != nil {
   496  		t.blockCache.Close(false)
   497  	}
   498  }
   499  
   500  // Creates new initialized table ops instance.
   501  func newTableOps(s *session) *tOps {
   502  	var (
   503  		fileCacher  cache.Cacher
   504  		blockCache  *cache.Cache
   505  		blockBuffer *util.BufferPool
   506  	)
   507  	if s.o.GetOpenFilesCacheCapacity() > 0 {
   508  		fileCacher = s.o.GetOpenFilesCacher().New(s.o.GetOpenFilesCacheCapacity())
   509  	}
   510  	if !s.o.GetDisableBlockCache() {
   511  		var blockCacher cache.Cacher
   512  		if s.o.GetBlockCacheCapacity() > 0 {
   513  			blockCacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
   514  		}
   515  		blockCache = cache.NewCache(blockCacher)
   516  	}
   517  	if !s.o.GetDisableBufferPool() {
   518  		blockBuffer = util.NewBufferPool(s.o.GetBlockSize() + 5)
   519  	}
   520  	return &tOps{
   521  		s:            s,
   522  		noSync:       s.o.GetNoSync(),
   523  		evictRemoved: s.o.GetBlockCacheEvictRemoved(),
   524  		fileCache:    cache.NewCache(fileCacher),
   525  		blockCache:   blockCache,
   526  		blockBuffer:  blockBuffer,
   527  	}
   528  }
   529  
   530  // tWriter wraps the table writer. It keep track of file descriptor
   531  // and added key range.
   532  type tWriter struct {
   533  	t *tOps
   534  
   535  	fd storage.FileDesc
   536  	w  storage.Writer
   537  	tw *table.Writer
   538  
   539  	first, last []byte
   540  }
   541  
   542  // Append key/value pair to the table.
   543  func (w *tWriter) append(key, value []byte) error {
   544  	if w.first == nil {
   545  		w.first = append([]byte(nil), key...)
   546  	}
   547  	w.last = append(w.last[:0], key...)
   548  	return w.tw.Append(key, value)
   549  }
   550  
   551  // Returns true if the table is empty.
   552  func (w *tWriter) empty() bool {
   553  	return w.first == nil
   554  }
   555  
   556  // Closes the storage.Writer.
   557  func (w *tWriter) close() error {
   558  	if w.w != nil {
   559  		if err := w.w.Close(); err != nil {
   560  			return err
   561  		}
   562  		w.w = nil
   563  	}
   564  	return nil
   565  }
   566  
   567  // Finalizes the table and returns table file.
   568  func (w *tWriter) finish() (f *tFile, err error) {
   569  	defer func() {
   570  		if cerr := w.close(); cerr != nil {
   571  			if err == nil {
   572  				err = cerr
   573  			} else {
   574  				err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, cerr)
   575  			}
   576  		}
   577  	}()
   578  	err = w.tw.Close()
   579  	if err != nil {
   580  		return
   581  	}
   582  	if !w.t.noSync {
   583  		err = w.w.Sync()
   584  		if err != nil {
   585  			return
   586  		}
   587  	}
   588  	f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
   589  	return
   590  }
   591  
   592  // Drops the table.
   593  func (w *tWriter) drop() error {
   594  	if err := w.close(); err != nil {
   595  		return err
   596  	}
   597  	w.tw = nil
   598  	w.first = nil
   599  	w.last = nil
   600  	if err := w.t.s.stor.Remove(w.fd); err != nil {
   601  		return err
   602  	}
   603  	w.t.s.reuseFileNum(w.fd.Num)
   604  	return nil
   605  }
   606  

View as plain text