...

Source file src/github.com/syndtr/goleveldb/leveldb/version.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"fmt"
    11  	"sync/atomic"
    12  	"time"
    13  	"unsafe"
    14  
    15  	"github.com/syndtr/goleveldb/leveldb/iterator"
    16  	"github.com/syndtr/goleveldb/leveldb/opt"
    17  	"github.com/syndtr/goleveldb/leveldb/util"
    18  )
    19  
    20  type tSet struct {
    21  	level int
    22  	table *tFile
    23  }
    24  
    25  type version struct {
    26  	id int64 // unique monotonous increasing version id
    27  	s  *session
    28  
    29  	levels []tFiles
    30  
    31  	// Level that should be compacted next and its compaction score.
    32  	// Score < 1 means compaction is not strictly needed. These fields
    33  	// are initialized by computeCompaction()
    34  	cLevel int
    35  	cScore float64
    36  
    37  	cSeek unsafe.Pointer
    38  
    39  	closing  bool
    40  	ref      int
    41  	released bool
    42  }
    43  
    44  // newVersion creates a new version with an unique monotonous increasing id.
    45  func newVersion(s *session) *version {
    46  	id := atomic.AddInt64(&s.ntVersionID, 1)
    47  	nv := &version{s: s, id: id - 1}
    48  	return nv
    49  }
    50  
    51  func (v *version) incref() {
    52  	if v.released {
    53  		panic("already released")
    54  	}
    55  
    56  	v.ref++
    57  	if v.ref == 1 {
    58  		select {
    59  		case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
    60  			// We can use v.levels directly here since it is immutable.
    61  		case <-v.s.closeC:
    62  			v.s.log("reference loop already exist")
    63  		}
    64  	}
    65  }
    66  
    67  func (v *version) releaseNB() {
    68  	v.ref--
    69  	if v.ref > 0 {
    70  		return
    71  	} else if v.ref < 0 {
    72  		panic("negative version ref")
    73  	}
    74  	select {
    75  	case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
    76  		// We can use v.levels directly here since it is immutable.
    77  	case <-v.s.closeC:
    78  		v.s.log("reference loop already exist")
    79  	}
    80  
    81  	v.released = true
    82  }
    83  
    84  func (v *version) release() {
    85  	v.s.vmu.Lock()
    86  	v.releaseNB()
    87  	v.s.vmu.Unlock()
    88  }
    89  
    90  func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) {
    91  	ukey := ikey.ukey()
    92  
    93  	// Aux level.
    94  	if aux != nil {
    95  		for _, t := range aux {
    96  			if t.overlaps(v.s.icmp, ukey, ukey) {
    97  				if !f(-1, t) {
    98  					return
    99  				}
   100  			}
   101  		}
   102  
   103  		if lf != nil && !lf(-1) {
   104  			return
   105  		}
   106  	}
   107  
   108  	// Walk tables level-by-level.
   109  	for level, tables := range v.levels {
   110  		if len(tables) == 0 {
   111  			continue
   112  		}
   113  
   114  		if level == 0 {
   115  			// Level-0 files may overlap each other. Find all files that
   116  			// overlap ukey.
   117  			for _, t := range tables {
   118  				if t.overlaps(v.s.icmp, ukey, ukey) {
   119  					if !f(level, t) {
   120  						return
   121  					}
   122  				}
   123  			}
   124  		} else {
   125  			if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) {
   126  				t := tables[i]
   127  				if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
   128  					if !f(level, t) {
   129  						return
   130  					}
   131  				}
   132  			}
   133  		}
   134  
   135  		if lf != nil && !lf(level) {
   136  			return
   137  		}
   138  	}
   139  }
   140  
   141  func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
   142  	if v.closing {
   143  		return nil, false, ErrClosed
   144  	}
   145  
   146  	ukey := ikey.ukey()
   147  	sampleSeeks := !v.s.o.GetDisableSeeksCompaction()
   148  
   149  	var (
   150  		tset  *tSet
   151  		tseek bool
   152  
   153  		// Level-0.
   154  		zfound bool
   155  		zseq   uint64
   156  		zkt    keyType
   157  		zval   []byte
   158  	)
   159  
   160  	err = ErrNotFound
   161  
   162  	// Since entries never hop across level, finding key/value
   163  	// in smaller level make later levels irrelevant.
   164  	v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
   165  		if sampleSeeks && level >= 0 && !tseek {
   166  			if tset == nil {
   167  				tset = &tSet{level, t}
   168  			} else {
   169  				tseek = true
   170  			}
   171  		}
   172  
   173  		var (
   174  			fikey, fval []byte
   175  			ferr        error
   176  		)
   177  		if noValue {
   178  			fikey, ferr = v.s.tops.findKey(t, ikey, ro)
   179  		} else {
   180  			fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
   181  		}
   182  
   183  		switch ferr {
   184  		case nil:
   185  		case ErrNotFound:
   186  			return true
   187  		default:
   188  			err = ferr
   189  			return false
   190  		}
   191  
   192  		if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil {
   193  			if v.s.icmp.uCompare(ukey, fukey) == 0 {
   194  				// Level <= 0 may overlaps each-other.
   195  				if level <= 0 {
   196  					if fseq >= zseq {
   197  						zfound = true
   198  						zseq = fseq
   199  						zkt = fkt
   200  						zval = fval
   201  					}
   202  				} else {
   203  					switch fkt {
   204  					case keyTypeVal:
   205  						value = fval
   206  						err = nil
   207  					case keyTypeDel:
   208  					default:
   209  						panic("leveldb: invalid internalKey type")
   210  					}
   211  					return false
   212  				}
   213  			}
   214  		} else {
   215  			err = fkerr
   216  			return false
   217  		}
   218  
   219  		return true
   220  	}, func(level int) bool {
   221  		if zfound {
   222  			switch zkt {
   223  			case keyTypeVal:
   224  				value = zval
   225  				err = nil
   226  			case keyTypeDel:
   227  			default:
   228  				panic("leveldb: invalid internalKey type")
   229  			}
   230  			return false
   231  		}
   232  
   233  		return true
   234  	})
   235  
   236  	if tseek && tset.table.consumeSeek() <= 0 {
   237  		tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
   238  	}
   239  
   240  	return
   241  }
   242  
   243  func (v *version) sampleSeek(ikey internalKey) (tcomp bool) {
   244  	var tset *tSet
   245  
   246  	v.walkOverlapping(nil, ikey, func(level int, t *tFile) bool {
   247  		if tset == nil {
   248  			tset = &tSet{level, t}
   249  			return true
   250  		}
   251  		if tset.table.consumeSeek() <= 0 {
   252  			tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
   253  		}
   254  		return false
   255  	}, nil)
   256  
   257  	return
   258  }
   259  
   260  func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) {
   261  	strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader)
   262  	for level, tables := range v.levels {
   263  		if level == 0 {
   264  			// Merge all level zero files together since they may overlap.
   265  			for _, t := range tables {
   266  				its = append(its, v.s.tops.newIterator(t, slice, ro))
   267  			}
   268  		} else if len(tables) != 0 {
   269  			its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict))
   270  		}
   271  	}
   272  	return
   273  }
   274  
   275  func (v *version) newStaging() *versionStaging {
   276  	return &versionStaging{base: v}
   277  }
   278  
   279  // Spawn a new version based on this version.
   280  func (v *version) spawn(r *sessionRecord, trivial bool) *version {
   281  	staging := v.newStaging()
   282  	staging.commit(r)
   283  	return staging.finish(trivial)
   284  }
   285  
   286  func (v *version) fillRecord(r *sessionRecord) {
   287  	for level, tables := range v.levels {
   288  		for _, t := range tables {
   289  			r.addTableFile(level, t)
   290  		}
   291  	}
   292  }
   293  
   294  func (v *version) tLen(level int) int {
   295  	if level < len(v.levels) {
   296  		return len(v.levels[level])
   297  	}
   298  	return 0
   299  }
   300  
   301  func (v *version) offsetOf(ikey internalKey) (n int64, err error) {
   302  	for level, tables := range v.levels {
   303  		for _, t := range tables {
   304  			if v.s.icmp.Compare(t.imax, ikey) <= 0 {
   305  				// Entire file is before "ikey", so just add the file size
   306  				n += t.size
   307  			} else if v.s.icmp.Compare(t.imin, ikey) > 0 {
   308  				// Entire file is after "ikey", so ignore
   309  				if level > 0 {
   310  					// Files other than level 0 are sorted by meta->min, so
   311  					// no further files in this level will contain data for
   312  					// "ikey".
   313  					break
   314  				}
   315  			} else {
   316  				// "ikey" falls in the range for this table. Add the
   317  				// approximate offset of "ikey" within the table.
   318  				if m, err := v.s.tops.offsetOf(t, ikey); err == nil {
   319  					n += m
   320  				} else {
   321  					return 0, err
   322  				}
   323  			}
   324  		}
   325  	}
   326  
   327  	return
   328  }
   329  
   330  func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
   331  	if maxLevel > 0 {
   332  		if len(v.levels) == 0 {
   333  			return maxLevel
   334  		}
   335  		if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) {
   336  			var overlaps tFiles
   337  			for ; level < maxLevel; level++ {
   338  				if pLevel := level + 1; pLevel >= len(v.levels) {
   339  					return maxLevel
   340  				} else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) {
   341  					break
   342  				}
   343  				if gpLevel := level + 2; gpLevel < len(v.levels) {
   344  					overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
   345  					if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) {
   346  						break
   347  					}
   348  				}
   349  			}
   350  		}
   351  	}
   352  	return
   353  }
   354  
   355  func (v *version) computeCompaction() {
   356  	// Precomputed best level for next compaction
   357  	bestLevel := int(-1)
   358  	bestScore := float64(-1)
   359  
   360  	statFiles := make([]int, len(v.levels))
   361  	statSizes := make([]string, len(v.levels))
   362  	statScore := make([]string, len(v.levels))
   363  	statTotSize := int64(0)
   364  
   365  	for level, tables := range v.levels {
   366  		var score float64
   367  		size := tables.size()
   368  		if level == 0 {
   369  			// We treat level-0 specially by bounding the number of files
   370  			// instead of number of bytes for two reasons:
   371  			//
   372  			// (1) With larger write-buffer sizes, it is nice not to do too
   373  			// many level-0 compaction.
   374  			//
   375  			// (2) The files in level-0 are merged on every read and
   376  			// therefore we wish to avoid too many files when the individual
   377  			// file size is small (perhaps because of a small write-buffer
   378  			// setting, or very high compression ratios, or lots of
   379  			// overwrites/deletions).
   380  			score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
   381  		} else {
   382  			score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
   383  		}
   384  
   385  		if score > bestScore {
   386  			bestLevel = level
   387  			bestScore = score
   388  		}
   389  
   390  		statFiles[level] = len(tables)
   391  		statSizes[level] = shortenb(size)
   392  		statScore[level] = fmt.Sprintf("%.2f", score)
   393  		statTotSize += size
   394  	}
   395  
   396  	v.cLevel = bestLevel
   397  	v.cScore = bestScore
   398  
   399  	v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(statTotSize), statSizes, statScore)
   400  }
   401  
   402  func (v *version) needCompaction() bool {
   403  	return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil
   404  }
   405  
   406  type tablesScratch struct {
   407  	added   map[int64]atRecord
   408  	deleted map[int64]struct{}
   409  }
   410  
   411  type versionStaging struct {
   412  	base   *version
   413  	levels []tablesScratch
   414  }
   415  
   416  func (p *versionStaging) getScratch(level int) *tablesScratch {
   417  	if level >= len(p.levels) {
   418  		newLevels := make([]tablesScratch, level+1)
   419  		copy(newLevels, p.levels)
   420  		p.levels = newLevels
   421  	}
   422  	return &(p.levels[level])
   423  }
   424  
   425  func (p *versionStaging) commit(r *sessionRecord) {
   426  	// Deleted tables.
   427  	for _, r := range r.deletedTables {
   428  		scratch := p.getScratch(r.level)
   429  		if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 {
   430  			if scratch.deleted == nil {
   431  				scratch.deleted = make(map[int64]struct{})
   432  			}
   433  			scratch.deleted[r.num] = struct{}{}
   434  		}
   435  		if scratch.added != nil {
   436  			delete(scratch.added, r.num)
   437  		}
   438  	}
   439  
   440  	// New tables.
   441  	for _, r := range r.addedTables {
   442  		scratch := p.getScratch(r.level)
   443  		if scratch.added == nil {
   444  			scratch.added = make(map[int64]atRecord)
   445  		}
   446  		scratch.added[r.num] = r
   447  		if scratch.deleted != nil {
   448  			delete(scratch.deleted, r.num)
   449  		}
   450  	}
   451  }
   452  
   453  func (p *versionStaging) finish(trivial bool) *version {
   454  	// Build new version.
   455  	nv := newVersion(p.base.s)
   456  	numLevel := len(p.levels)
   457  	if len(p.base.levels) > numLevel {
   458  		numLevel = len(p.base.levels)
   459  	}
   460  	nv.levels = make([]tFiles, numLevel)
   461  	for level := 0; level < numLevel; level++ {
   462  		var baseTabels tFiles
   463  		if level < len(p.base.levels) {
   464  			baseTabels = p.base.levels[level]
   465  		}
   466  
   467  		if level < len(p.levels) {
   468  			scratch := p.levels[level]
   469  
   470  			// Short circuit if there is no change at all.
   471  			if len(scratch.added) == 0 && len(scratch.deleted) == 0 {
   472  				nv.levels[level] = baseTabels
   473  				continue
   474  			}
   475  
   476  			var nt tFiles
   477  			// Prealloc list if possible.
   478  			if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
   479  				nt = make(tFiles, 0, n)
   480  			}
   481  
   482  			// Base tables.
   483  			for _, t := range baseTabels {
   484  				if _, ok := scratch.deleted[t.fd.Num]; ok {
   485  					continue
   486  				}
   487  				if _, ok := scratch.added[t.fd.Num]; ok {
   488  					continue
   489  				}
   490  				nt = append(nt, t)
   491  			}
   492  
   493  			// Avoid resort if only files in this level are deleted
   494  			if len(scratch.added) == 0 {
   495  				nv.levels[level] = nt
   496  				continue
   497  			}
   498  
   499  			// For normal table compaction, one compaction will only involve two levels
   500  			// of files. And the new files generated after merging the source level and
   501  			// source+1 level related files can be inserted as a whole into source+1 level
   502  			// without any overlap with the other source+1 files.
   503  			//
   504  			// When the amount of data maintained by leveldb is large, the number of files
   505  			// per level will be very large. While qsort is very inefficient for sorting
   506  			// already ordered arrays. Therefore, for the normal table compaction, we use
   507  			// binary search here to find the insert index to insert a batch of new added
   508  			// files directly instead of using qsort.
   509  			if trivial && len(scratch.added) > 0 {
   510  				added := make(tFiles, 0, len(scratch.added))
   511  				for _, r := range scratch.added {
   512  					added = append(added, tableFileFromRecord(r))
   513  				}
   514  				if level == 0 {
   515  					added.sortByNum()
   516  					index := nt.searchNumLess(added[len(added)-1].fd.Num)
   517  					nt = append(nt[:index], append(added, nt[index:]...)...)
   518  				} else {
   519  					added.sortByKey(p.base.s.icmp)
   520  					_, amax := added.getRange(p.base.s.icmp)
   521  					index := nt.searchMin(p.base.s.icmp, amax)
   522  					nt = append(nt[:index], append(added, nt[index:]...)...)
   523  				}
   524  				nv.levels[level] = nt
   525  				continue
   526  			}
   527  
   528  			// New tables.
   529  			for _, r := range scratch.added {
   530  				nt = append(nt, tableFileFromRecord(r))
   531  			}
   532  
   533  			if len(nt) != 0 {
   534  				// Sort tables.
   535  				if level == 0 {
   536  					nt.sortByNum()
   537  				} else {
   538  					nt.sortByKey(p.base.s.icmp)
   539  				}
   540  
   541  				nv.levels[level] = nt
   542  			}
   543  		} else {
   544  			nv.levels[level] = baseTabels
   545  		}
   546  	}
   547  
   548  	// Trim levels.
   549  	n := len(nv.levels)
   550  	for ; n > 0 && nv.levels[n-1] == nil; n-- {
   551  	}
   552  	nv.levels = nv.levels[:n]
   553  
   554  	// Compute compaction score for new version.
   555  	nv.computeCompaction()
   556  
   557  	return nv
   558  }
   559  
   560  type versionReleaser struct {
   561  	v    *version
   562  	once bool
   563  }
   564  
   565  func (vr *versionReleaser) Release() {
   566  	v := vr.v
   567  	v.s.vmu.Lock()
   568  	if !vr.once {
   569  		v.releaseNB()
   570  		vr.once = true
   571  	}
   572  	v.s.vmu.Unlock()
   573  }
   574  

View as plain text