...

Source file src/github.com/syndtr/goleveldb/leveldb/session_compaction.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"sort"
    11  	"sync/atomic"
    12  
    13  	"github.com/syndtr/goleveldb/leveldb/iterator"
    14  	"github.com/syndtr/goleveldb/leveldb/memdb"
    15  	"github.com/syndtr/goleveldb/leveldb/opt"
    16  )
    17  
    18  const (
    19  	undefinedCompaction = iota
    20  	level0Compaction
    21  	nonLevel0Compaction
    22  	seekCompaction
    23  )
    24  
    25  func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
    26  	v := s.version()
    27  	defer v.release()
    28  	return v.pickMemdbLevel(umin, umax, maxLevel)
    29  }
    30  
    31  func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
    32  	// Create sorted table.
    33  	iter := mdb.NewIterator(nil)
    34  	defer iter.Release()
    35  	t, n, err := s.tops.createFrom(iter)
    36  	if err != nil {
    37  		return 0, err
    38  	}
    39  
    40  	// Pick level other than zero can cause compaction issue with large
    41  	// bulk insert and delete on strictly incrementing key-space. The
    42  	// problem is that the small deletion markers trapped at lower level,
    43  	// while key/value entries keep growing at higher level. Since the
    44  	// key-space is strictly incrementing it will not overlaps with
    45  	// higher level, thus maximum possible level is always picked, while
    46  	// overlapping deletion marker pushed into lower level.
    47  	// See: https://github.com/syndtr/goleveldb/issues/127.
    48  	flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
    49  	rec.addTableFile(flushLevel, t)
    50  
    51  	s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(t.size), t.imin, t.imax)
    52  	return flushLevel, nil
    53  }
    54  
    55  // Pick a compaction based on current state; need external synchronization.
    56  func (s *session) pickCompaction() *compaction {
    57  	v := s.version()
    58  
    59  	var sourceLevel int
    60  	var t0 tFiles
    61  	var typ int
    62  	if v.cScore >= 1 {
    63  		sourceLevel = v.cLevel
    64  		cptr := s.getCompPtr(sourceLevel)
    65  		tables := v.levels[sourceLevel]
    66  		if cptr != nil && sourceLevel > 0 {
    67  			n := len(tables)
    68  			if i := sort.Search(n, func(i int) bool {
    69  				return s.icmp.Compare(tables[i].imax, cptr) > 0
    70  			}); i < n {
    71  				t0 = append(t0, tables[i])
    72  			}
    73  		}
    74  		if len(t0) == 0 {
    75  			t0 = append(t0, tables[0])
    76  		}
    77  		if sourceLevel == 0 {
    78  			typ = level0Compaction
    79  		} else {
    80  			typ = nonLevel0Compaction
    81  		}
    82  	} else {
    83  		if p := atomic.LoadPointer(&v.cSeek); p != nil {
    84  			ts := (*tSet)(p)
    85  			sourceLevel = ts.level
    86  			t0 = append(t0, ts.table)
    87  			typ = seekCompaction
    88  		} else {
    89  			v.release()
    90  			return nil
    91  		}
    92  	}
    93  
    94  	return newCompaction(s, v, sourceLevel, t0, typ)
    95  }
    96  
    97  // Create compaction from given level and range; need external synchronization.
    98  func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit bool) *compaction {
    99  	v := s.version()
   100  
   101  	if sourceLevel >= len(v.levels) {
   102  		v.release()
   103  		return nil
   104  	}
   105  
   106  	t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0)
   107  	if len(t0) == 0 {
   108  		v.release()
   109  		return nil
   110  	}
   111  
   112  	// Avoid compacting too much in one shot in case the range is large.
   113  	// But we cannot do this for level-0 since level-0 files can overlap
   114  	// and we must not pick one file and drop another older file if the
   115  	// two files overlap.
   116  	if !noLimit && sourceLevel > 0 {
   117  		limit := int64(v.s.o.GetCompactionSourceLimit(sourceLevel))
   118  		total := int64(0)
   119  		for i, t := range t0 {
   120  			total += t.size
   121  			if total >= limit {
   122  				s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
   123  				t0 = t0[:i+1]
   124  				break
   125  			}
   126  		}
   127  	}
   128  
   129  	typ := level0Compaction
   130  	if sourceLevel != 0 {
   131  		typ = nonLevel0Compaction
   132  	}
   133  	return newCompaction(s, v, sourceLevel, t0, typ)
   134  }
   135  
   136  func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles, typ int) *compaction {
   137  	c := &compaction{
   138  		s:             s,
   139  		v:             v,
   140  		typ:           typ,
   141  		sourceLevel:   sourceLevel,
   142  		levels:        [2]tFiles{t0, nil},
   143  		maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
   144  		tPtrs:         make([]int, len(v.levels)),
   145  	}
   146  	c.expand()
   147  	c.save()
   148  	return c
   149  }
   150  
   151  // compaction represent a compaction state.
   152  type compaction struct {
   153  	s *session
   154  	v *version
   155  
   156  	typ           int
   157  	sourceLevel   int
   158  	levels        [2]tFiles
   159  	maxGPOverlaps int64
   160  
   161  	gp                tFiles
   162  	gpi               int
   163  	seenKey           bool
   164  	gpOverlappedBytes int64
   165  	imin, imax        internalKey
   166  	tPtrs             []int
   167  	released          bool
   168  
   169  	snapGPI               int
   170  	snapSeenKey           bool
   171  	snapGPOverlappedBytes int64
   172  	snapTPtrs             []int
   173  }
   174  
   175  func (c *compaction) save() {
   176  	c.snapGPI = c.gpi
   177  	c.snapSeenKey = c.seenKey
   178  	c.snapGPOverlappedBytes = c.gpOverlappedBytes
   179  	c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
   180  }
   181  
   182  func (c *compaction) restore() {
   183  	c.gpi = c.snapGPI
   184  	c.seenKey = c.snapSeenKey
   185  	c.gpOverlappedBytes = c.snapGPOverlappedBytes
   186  	c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
   187  }
   188  
   189  func (c *compaction) release() {
   190  	if !c.released {
   191  		c.released = true
   192  		c.v.release()
   193  	}
   194  }
   195  
   196  // Expand compacted tables; need external synchronization.
   197  func (c *compaction) expand() {
   198  	limit := int64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
   199  	vt0 := c.v.levels[c.sourceLevel]
   200  	vt1 := tFiles{}
   201  	if level := c.sourceLevel + 1; level < len(c.v.levels) {
   202  		vt1 = c.v.levels[level]
   203  	}
   204  
   205  	t0, t1 := c.levels[0], c.levels[1]
   206  	imin, imax := t0.getRange(c.s.icmp)
   207  
   208  	// For non-zero levels, the ukey can't hop across tables at all.
   209  	if c.sourceLevel == 0 {
   210  		// We expand t0 here just incase ukey hop across tables.
   211  		t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
   212  		if len(t0) != len(c.levels[0]) {
   213  			imin, imax = t0.getRange(c.s.icmp)
   214  		}
   215  	}
   216  	t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
   217  	// Get entire range covered by compaction.
   218  	amin, amax := append(t0, t1...).getRange(c.s.icmp)
   219  
   220  	// See if we can grow the number of inputs in "sourceLevel" without
   221  	// changing the number of "sourceLevel+1" files we pick up.
   222  	if len(t1) > 0 {
   223  		exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.sourceLevel == 0)
   224  		if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
   225  			xmin, xmax := exp0.getRange(c.s.icmp)
   226  			exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
   227  			if len(exp1) == len(t1) {
   228  				c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
   229  					c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(t0.size()), len(t1), shortenb(t1.size()),
   230  					len(exp0), shortenb(exp0.size()), len(exp1), shortenb(exp1.size()))
   231  				imin, imax = xmin, xmax
   232  				t0, t1 = exp0, exp1
   233  				amin, amax = append(t0, t1...).getRange(c.s.icmp)
   234  			}
   235  		}
   236  	}
   237  
   238  	// Compute the set of grandparent files that overlap this compaction
   239  	// (parent == sourceLevel+1; grandparent == sourceLevel+2)
   240  	if level := c.sourceLevel + 2; level < len(c.v.levels) {
   241  		c.gp = c.v.levels[level].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
   242  	}
   243  
   244  	c.levels[0], c.levels[1] = t0, t1
   245  	c.imin, c.imax = imin, imax
   246  }
   247  
   248  // Check whether compaction is trivial.
   249  func (c *compaction) trivial() bool {
   250  	return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
   251  }
   252  
   253  func (c *compaction) baseLevelForKey(ukey []byte) bool {
   254  	for level := c.sourceLevel + 2; level < len(c.v.levels); level++ {
   255  		tables := c.v.levels[level]
   256  		for c.tPtrs[level] < len(tables) {
   257  			t := tables[c.tPtrs[level]]
   258  			if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
   259  				// We've advanced far enough.
   260  				if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
   261  					// Key falls in this file's range, so definitely not base level.
   262  					return false
   263  				}
   264  				break
   265  			}
   266  			c.tPtrs[level]++
   267  		}
   268  	}
   269  	return true
   270  }
   271  
   272  func (c *compaction) shouldStopBefore(ikey internalKey) bool {
   273  	for ; c.gpi < len(c.gp); c.gpi++ {
   274  		gp := c.gp[c.gpi]
   275  		if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
   276  			break
   277  		}
   278  		if c.seenKey {
   279  			c.gpOverlappedBytes += gp.size
   280  		}
   281  	}
   282  	c.seenKey = true
   283  
   284  	if c.gpOverlappedBytes > c.maxGPOverlaps {
   285  		// Too much overlap for current output; start new output.
   286  		c.gpOverlappedBytes = 0
   287  		return true
   288  	}
   289  	return false
   290  }
   291  
   292  // Creates an iterator.
   293  func (c *compaction) newIterator() iterator.Iterator {
   294  	// Creates iterator slice.
   295  	icap := len(c.levels)
   296  	if c.sourceLevel == 0 {
   297  		// Special case for level-0.
   298  		icap = len(c.levels[0]) + 1
   299  	}
   300  	its := make([]iterator.Iterator, 0, icap)
   301  
   302  	// Options.
   303  	ro := &opt.ReadOptions{
   304  		DontFillCache: true,
   305  		Strict:        opt.StrictOverride,
   306  	}
   307  	strict := c.s.o.GetStrict(opt.StrictCompaction)
   308  	if strict {
   309  		ro.Strict |= opt.StrictReader
   310  	}
   311  
   312  	for i, tables := range c.levels {
   313  		if len(tables) == 0 {
   314  			continue
   315  		}
   316  
   317  		// Level-0 is not sorted and may overlaps each other.
   318  		if c.sourceLevel+i == 0 {
   319  			for _, t := range tables {
   320  				its = append(its, c.s.tops.newIterator(t, nil, ro))
   321  			}
   322  		} else {
   323  			it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
   324  			its = append(its, it)
   325  		}
   326  	}
   327  
   328  	return iterator.NewMergedIterator(its, c.s.icmp, strict)
   329  }
   330  

View as plain text