...

Source file src/github.com/syndtr/goleveldb/leveldb/db_compaction.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"fmt"
    11  	"sync"
    12  	"sync/atomic"
    13  	"time"
    14  
    15  	"github.com/syndtr/goleveldb/leveldb/errors"
    16  	"github.com/syndtr/goleveldb/leveldb/opt"
    17  	"github.com/syndtr/goleveldb/leveldb/storage"
    18  )
    19  
    20  var (
    21  	errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
    22  )
    23  
    24  type cStat struct {
    25  	duration time.Duration
    26  	read     int64
    27  	write    int64
    28  }
    29  
    30  func (p *cStat) add(n *cStatStaging) {
    31  	p.duration += n.duration
    32  	p.read += n.read
    33  	p.write += n.write
    34  }
    35  
    36  func (p *cStat) get() (duration time.Duration, read, write int64) {
    37  	return p.duration, p.read, p.write
    38  }
    39  
    40  type cStatStaging struct {
    41  	start    time.Time
    42  	duration time.Duration
    43  	on       bool
    44  	read     int64
    45  	write    int64
    46  }
    47  
    48  func (p *cStatStaging) startTimer() {
    49  	if !p.on {
    50  		p.start = time.Now()
    51  		p.on = true
    52  	}
    53  }
    54  
    55  func (p *cStatStaging) stopTimer() {
    56  	if p.on {
    57  		p.duration += time.Since(p.start)
    58  		p.on = false
    59  	}
    60  }
    61  
    62  type cStats struct {
    63  	lk    sync.Mutex
    64  	stats []cStat
    65  }
    66  
    67  func (p *cStats) addStat(level int, n *cStatStaging) {
    68  	p.lk.Lock()
    69  	if level >= len(p.stats) {
    70  		newStats := make([]cStat, level+1)
    71  		copy(newStats, p.stats)
    72  		p.stats = newStats
    73  	}
    74  	p.stats[level].add(n)
    75  	p.lk.Unlock()
    76  }
    77  
    78  func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
    79  	p.lk.Lock()
    80  	defer p.lk.Unlock()
    81  	if level < len(p.stats) {
    82  		return p.stats[level].get()
    83  	}
    84  	return
    85  }
    86  
    87  func (db *DB) compactionError() {
    88  	var err error
    89  noerr:
    90  	// No error.
    91  	for {
    92  		select {
    93  		case err = <-db.compErrSetC:
    94  			switch {
    95  			case err == nil:
    96  			case err == ErrReadOnly, errors.IsCorrupted(err):
    97  				goto hasperr
    98  			default:
    99  				goto haserr
   100  			}
   101  		case <-db.closeC:
   102  			return
   103  		}
   104  	}
   105  haserr:
   106  	// Transient error.
   107  	for {
   108  		select {
   109  		case db.compErrC <- err:
   110  		case err = <-db.compErrSetC:
   111  			switch {
   112  			case err == nil:
   113  				goto noerr
   114  			case err == ErrReadOnly, errors.IsCorrupted(err):
   115  				goto hasperr
   116  			default:
   117  			}
   118  		case <-db.closeC:
   119  			return
   120  		}
   121  	}
   122  hasperr:
   123  	// Persistent error.
   124  	for {
   125  		select {
   126  		case db.compErrC <- err:
   127  		case db.compPerErrC <- err:
   128  		case db.writeLockC <- struct{}{}:
   129  			// Hold write lock, so that write won't pass-through.
   130  			db.compWriteLocking = true
   131  		case <-db.closeC:
   132  			if db.compWriteLocking {
   133  				// We should release the lock or Close will hang.
   134  				<-db.writeLockC
   135  			}
   136  			return
   137  		}
   138  	}
   139  }
   140  
   141  type compactionTransactCounter int
   142  
   143  func (cnt *compactionTransactCounter) incr() {
   144  	*cnt++
   145  }
   146  
   147  type compactionTransactInterface interface {
   148  	run(cnt *compactionTransactCounter) error
   149  	revert() error
   150  }
   151  
   152  func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
   153  	defer func() {
   154  		if x := recover(); x != nil {
   155  			if x == errCompactionTransactExiting {
   156  				if err := t.revert(); err != nil {
   157  					db.logf("%s revert error %q", name, err)
   158  				}
   159  			}
   160  			panic(x)
   161  		}
   162  	}()
   163  
   164  	const (
   165  		backoffMin = 1 * time.Second
   166  		backoffMax = 8 * time.Second
   167  		backoffMul = 2 * time.Second
   168  	)
   169  	var (
   170  		backoff  = backoffMin
   171  		backoffT = time.NewTimer(backoff)
   172  		lastCnt  = compactionTransactCounter(0)
   173  
   174  		disableBackoff = db.s.o.GetDisableCompactionBackoff()
   175  	)
   176  	for n := 0; ; n++ {
   177  		// Check whether the DB is closed.
   178  		if db.isClosed() {
   179  			db.logf("%s exiting", name)
   180  			db.compactionExitTransact()
   181  		} else if n > 0 {
   182  			db.logf("%s retrying N·%d", name, n)
   183  		}
   184  
   185  		// Execute.
   186  		cnt := compactionTransactCounter(0)
   187  		err := t.run(&cnt)
   188  		if err != nil {
   189  			db.logf("%s error I·%d %q", name, cnt, err)
   190  		}
   191  
   192  		// Set compaction error status.
   193  		select {
   194  		case db.compErrSetC <- err:
   195  		case perr := <-db.compPerErrC:
   196  			if err != nil {
   197  				db.logf("%s exiting (persistent error %q)", name, perr)
   198  				db.compactionExitTransact()
   199  			}
   200  		case <-db.closeC:
   201  			db.logf("%s exiting", name)
   202  			db.compactionExitTransact()
   203  		}
   204  		if err == nil {
   205  			return
   206  		}
   207  		if errors.IsCorrupted(err) {
   208  			db.logf("%s exiting (corruption detected)", name)
   209  			db.compactionExitTransact()
   210  		}
   211  
   212  		if !disableBackoff {
   213  			// Reset backoff duration if counter is advancing.
   214  			if cnt > lastCnt {
   215  				backoff = backoffMin
   216  				lastCnt = cnt
   217  			}
   218  
   219  			// Backoff.
   220  			backoffT.Reset(backoff)
   221  			if backoff < backoffMax {
   222  				backoff *= backoffMul
   223  				if backoff > backoffMax {
   224  					backoff = backoffMax
   225  				}
   226  			}
   227  			select {
   228  			case <-backoffT.C:
   229  			case <-db.closeC:
   230  				db.logf("%s exiting", name)
   231  				db.compactionExitTransact()
   232  			}
   233  		}
   234  	}
   235  }
   236  
   237  type compactionTransactFunc struct {
   238  	runFunc    func(cnt *compactionTransactCounter) error
   239  	revertFunc func() error
   240  }
   241  
   242  func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
   243  	return t.runFunc(cnt)
   244  }
   245  
   246  func (t *compactionTransactFunc) revert() error {
   247  	if t.revertFunc != nil {
   248  		return t.revertFunc()
   249  	}
   250  	return nil
   251  }
   252  
   253  func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
   254  	db.compactionTransact(name, &compactionTransactFunc{run, revert})
   255  }
   256  
   257  func (db *DB) compactionExitTransact() {
   258  	panic(errCompactionTransactExiting)
   259  }
   260  
   261  func (db *DB) compactionCommit(name string, rec *sessionRecord) {
   262  	db.compCommitLk.Lock()
   263  	defer db.compCommitLk.Unlock() // Defer is necessary.
   264  	db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
   265  		return db.s.commit(rec, true)
   266  	}, nil)
   267  }
   268  
   269  func (db *DB) memCompaction() {
   270  	mdb := db.getFrozenMem()
   271  	if mdb == nil {
   272  		return
   273  	}
   274  	defer mdb.decref()
   275  
   276  	db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(int64(mdb.Size())))
   277  
   278  	// Don't compact empty memdb.
   279  	if mdb.Len() == 0 {
   280  		db.logf("memdb@flush skipping")
   281  		// drop frozen memdb
   282  		db.dropFrozenMem()
   283  		return
   284  	}
   285  
   286  	// Pause table compaction.
   287  	resumeC := make(chan struct{})
   288  	select {
   289  	case db.tcompPauseC <- (chan<- struct{})(resumeC):
   290  	case <-db.compPerErrC:
   291  		close(resumeC)
   292  		resumeC = nil
   293  	case <-db.closeC:
   294  		db.compactionExitTransact()
   295  	}
   296  
   297  	var (
   298  		rec        = &sessionRecord{}
   299  		stats      = &cStatStaging{}
   300  		flushLevel int
   301  	)
   302  
   303  	// Generate tables.
   304  	db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
   305  		stats.startTimer()
   306  		flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
   307  		stats.stopTimer()
   308  		return
   309  	}, func() error {
   310  		for _, r := range rec.addedTables {
   311  			db.logf("memdb@flush revert @%d", r.num)
   312  			if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
   313  				return err
   314  			}
   315  		}
   316  		return nil
   317  	})
   318  
   319  	rec.setJournalNum(db.journalFd.Num)
   320  	rec.setSeqNum(db.frozenSeq)
   321  
   322  	// Commit.
   323  	stats.startTimer()
   324  	db.compactionCommit("memdb", rec)
   325  	stats.stopTimer()
   326  
   327  	db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
   328  
   329  	// Save compaction stats
   330  	for _, r := range rec.addedTables {
   331  		stats.write += r.size
   332  	}
   333  	db.compStats.addStat(flushLevel, stats)
   334  	atomic.AddUint32(&db.memComp, 1)
   335  
   336  	// Drop frozen memdb.
   337  	db.dropFrozenMem()
   338  
   339  	// Resume table compaction.
   340  	if resumeC != nil {
   341  		select {
   342  		case <-resumeC:
   343  			close(resumeC)
   344  		case <-db.closeC:
   345  			db.compactionExitTransact()
   346  		}
   347  	}
   348  
   349  	// Trigger table compaction.
   350  	db.compTrigger(db.tcompCmdC)
   351  }
   352  
   353  type tableCompactionBuilder struct {
   354  	db    *DB
   355  	s     *session
   356  	c     *compaction
   357  	rec   *sessionRecord
   358  	stat1 *cStatStaging
   359  
   360  	snapHasLastUkey bool
   361  	snapLastUkey    []byte
   362  	snapLastSeq     uint64
   363  	snapIter        int
   364  	snapKerrCnt     int
   365  	snapDropCnt     int
   366  
   367  	kerrCnt int
   368  	dropCnt int
   369  
   370  	minSeq    uint64
   371  	strict    bool
   372  	tableSize int
   373  
   374  	tw *tWriter
   375  }
   376  
   377  func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
   378  	// Create new table if not already.
   379  	if b.tw == nil {
   380  		// Check for pause event.
   381  		if b.db != nil {
   382  			select {
   383  			case ch := <-b.db.tcompPauseC:
   384  				b.db.pauseCompaction(ch)
   385  			case <-b.db.closeC:
   386  				b.db.compactionExitTransact()
   387  			default:
   388  			}
   389  		}
   390  
   391  		// Create new table.
   392  		var err error
   393  		b.tw, err = b.s.tops.create(b.tableSize)
   394  		if err != nil {
   395  			return err
   396  		}
   397  	}
   398  
   399  	// Write key/value into table.
   400  	return b.tw.append(key, value)
   401  }
   402  
   403  func (b *tableCompactionBuilder) needFlush() bool {
   404  	return b.tw.tw.BytesLen() >= b.tableSize
   405  }
   406  
   407  func (b *tableCompactionBuilder) flush() error {
   408  	t, err := b.tw.finish()
   409  	if err != nil {
   410  		return err
   411  	}
   412  	b.rec.addTableFile(b.c.sourceLevel+1, t)
   413  	b.stat1.write += t.size
   414  	b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(t.size), t.imin, t.imax)
   415  	b.tw = nil
   416  	return nil
   417  }
   418  
   419  func (b *tableCompactionBuilder) cleanup() error {
   420  	if b.tw != nil {
   421  		if err := b.tw.drop(); err != nil {
   422  			return err
   423  		}
   424  		b.tw = nil
   425  	}
   426  	return nil
   427  }
   428  
   429  func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) {
   430  	snapResumed := b.snapIter > 0
   431  	hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
   432  	lastUkey := append([]byte(nil), b.snapLastUkey...)
   433  	lastSeq := b.snapLastSeq
   434  	b.kerrCnt = b.snapKerrCnt
   435  	b.dropCnt = b.snapDropCnt
   436  	// Restore compaction state.
   437  	b.c.restore()
   438  
   439  	defer func() {
   440  		if cerr := b.cleanup(); cerr != nil {
   441  			if err == nil {
   442  				err = cerr
   443  			} else {
   444  				err = fmt.Errorf("tableCompactionBuilder error: %v, cleanup error (%v)", err, cerr)
   445  			}
   446  		}
   447  	}()
   448  
   449  	b.stat1.startTimer()
   450  	defer b.stat1.stopTimer()
   451  
   452  	iter := b.c.newIterator()
   453  	defer iter.Release()
   454  	for i := 0; iter.Next(); i++ {
   455  		// Incr transact counter.
   456  		cnt.incr()
   457  
   458  		// Skip until last state.
   459  		if i < b.snapIter {
   460  			continue
   461  		}
   462  
   463  		resumed := false
   464  		if snapResumed {
   465  			resumed = true
   466  			snapResumed = false
   467  		}
   468  
   469  		ikey := iter.Key()
   470  		ukey, seq, kt, kerr := parseInternalKey(ikey)
   471  
   472  		if kerr == nil {
   473  			shouldStop := !resumed && b.c.shouldStopBefore(ikey)
   474  
   475  			if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
   476  				// First occurrence of this user key.
   477  
   478  				// Only rotate tables if ukey doesn't hop across.
   479  				if b.tw != nil && (shouldStop || b.needFlush()) {
   480  					if err := b.flush(); err != nil {
   481  						return err
   482  					}
   483  
   484  					// Creates snapshot of the state.
   485  					b.c.save()
   486  					b.snapHasLastUkey = hasLastUkey
   487  					b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
   488  					b.snapLastSeq = lastSeq
   489  					b.snapIter = i
   490  					b.snapKerrCnt = b.kerrCnt
   491  					b.snapDropCnt = b.dropCnt
   492  				}
   493  
   494  				hasLastUkey = true
   495  				lastUkey = append(lastUkey[:0], ukey...)
   496  				lastSeq = keyMaxSeq
   497  			}
   498  
   499  			switch {
   500  			case lastSeq <= b.minSeq:
   501  				// Dropped because newer entry for same user key exist
   502  				fallthrough // (A)
   503  			case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
   504  				// For this user key:
   505  				// (1) there is no data in higher levels
   506  				// (2) data in lower levels will have larger seq numbers
   507  				// (3) data in layers that are being compacted here and have
   508  				//     smaller seq numbers will be dropped in the next
   509  				//     few iterations of this loop (by rule (A) above).
   510  				// Therefore this deletion marker is obsolete and can be dropped.
   511  				lastSeq = seq
   512  				b.dropCnt++
   513  				continue
   514  			default:
   515  				lastSeq = seq
   516  			}
   517  		} else {
   518  			if b.strict {
   519  				return kerr
   520  			}
   521  
   522  			// Don't drop corrupted keys.
   523  			hasLastUkey = false
   524  			lastUkey = lastUkey[:0]
   525  			lastSeq = keyMaxSeq
   526  			b.kerrCnt++
   527  		}
   528  
   529  		if err := b.appendKV(ikey, iter.Value()); err != nil {
   530  			return err
   531  		}
   532  	}
   533  
   534  	if err := iter.Error(); err != nil {
   535  		return err
   536  	}
   537  
   538  	// Finish last table.
   539  	if b.tw != nil && !b.tw.empty() {
   540  		return b.flush()
   541  	}
   542  	return nil
   543  }
   544  
   545  func (b *tableCompactionBuilder) revert() error {
   546  	for _, at := range b.rec.addedTables {
   547  		b.s.logf("table@build revert @%d", at.num)
   548  		if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
   549  			return err
   550  		}
   551  	}
   552  	return nil
   553  }
   554  
   555  func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
   556  	defer c.release()
   557  
   558  	rec := &sessionRecord{}
   559  	rec.addCompPtr(c.sourceLevel, c.imax)
   560  
   561  	if !noTrivial && c.trivial() {
   562  		t := c.levels[0][0]
   563  		db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
   564  		rec.delTable(c.sourceLevel, t.fd.Num)
   565  		rec.addTableFile(c.sourceLevel+1, t)
   566  		db.compactionCommit("table-move", rec)
   567  		return
   568  	}
   569  
   570  	var stats [2]cStatStaging
   571  	for i, tables := range c.levels {
   572  		for _, t := range tables {
   573  			stats[i].read += t.size
   574  			// Insert deleted tables into record
   575  			rec.delTable(c.sourceLevel+i, t.fd.Num)
   576  		}
   577  	}
   578  	sourceSize := stats[0].read + stats[1].read
   579  	minSeq := db.minSeq()
   580  	db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
   581  
   582  	b := &tableCompactionBuilder{
   583  		db:        db,
   584  		s:         db.s,
   585  		c:         c,
   586  		rec:       rec,
   587  		stat1:     &stats[1],
   588  		minSeq:    minSeq,
   589  		strict:    db.s.o.GetStrict(opt.StrictCompaction),
   590  		tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
   591  	}
   592  	db.compactionTransact("table@build", b)
   593  
   594  	// Commit.
   595  	stats[1].startTimer()
   596  	db.compactionCommit("table", rec)
   597  	stats[1].stopTimer()
   598  
   599  	resultSize := stats[1].write
   600  	db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
   601  
   602  	// Save compaction stats
   603  	for i := range stats {
   604  		db.compStats.addStat(c.sourceLevel+1, &stats[i])
   605  	}
   606  	switch c.typ {
   607  	case level0Compaction:
   608  		atomic.AddUint32(&db.level0Comp, 1)
   609  	case nonLevel0Compaction:
   610  		atomic.AddUint32(&db.nonLevel0Comp, 1)
   611  	case seekCompaction:
   612  		atomic.AddUint32(&db.seekComp, 1)
   613  	}
   614  }
   615  
   616  func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
   617  	db.logf("table@compaction range L%d %q:%q", level, umin, umax)
   618  	if level >= 0 {
   619  		if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
   620  			db.tableCompaction(c, true)
   621  		}
   622  	} else {
   623  		// Retry until nothing to compact.
   624  		for {
   625  			compacted := false
   626  
   627  			// Scan for maximum level with overlapped tables.
   628  			v := db.s.version()
   629  			m := 1
   630  			for i := m; i < len(v.levels); i++ {
   631  				tables := v.levels[i]
   632  				if tables.overlaps(db.s.icmp, umin, umax, false) {
   633  					m = i
   634  				}
   635  			}
   636  			v.release()
   637  
   638  			for level := 0; level < m; level++ {
   639  				if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
   640  					db.tableCompaction(c, true)
   641  					compacted = true
   642  				}
   643  			}
   644  
   645  			if !compacted {
   646  				break
   647  			}
   648  		}
   649  	}
   650  
   651  	return nil
   652  }
   653  
   654  func (db *DB) tableAutoCompaction() {
   655  	if c := db.s.pickCompaction(); c != nil {
   656  		db.tableCompaction(c, false)
   657  	}
   658  }
   659  
   660  func (db *DB) tableNeedCompaction() bool {
   661  	v := db.s.version()
   662  	defer v.release()
   663  	return v.needCompaction()
   664  }
   665  
   666  // resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
   667  func (db *DB) resumeWrite() bool {
   668  	v := db.s.version()
   669  	defer v.release()
   670  	return v.tLen(0) < db.s.o.GetWriteL0PauseTrigger()
   671  }
   672  
   673  func (db *DB) pauseCompaction(ch chan<- struct{}) {
   674  	select {
   675  	case ch <- struct{}{}:
   676  	case <-db.closeC:
   677  		db.compactionExitTransact()
   678  	}
   679  }
   680  
   681  type cCmd interface {
   682  	ack(err error)
   683  }
   684  
   685  type cAuto struct {
   686  	// Note for table compaction, an non-empty ackC represents it's a compaction waiting command.
   687  	ackC chan<- error
   688  }
   689  
   690  func (r cAuto) ack(err error) {
   691  	if r.ackC != nil {
   692  		defer func() {
   693  			_ = recover()
   694  		}()
   695  		r.ackC <- err
   696  	}
   697  }
   698  
   699  type cRange struct {
   700  	level    int
   701  	min, max []byte
   702  	ackC     chan<- error
   703  }
   704  
   705  func (r cRange) ack(err error) {
   706  	if r.ackC != nil {
   707  		defer func() {
   708  			_ = recover()
   709  		}()
   710  		r.ackC <- err
   711  	}
   712  }
   713  
   714  // This will trigger auto compaction but will not wait for it.
   715  func (db *DB) compTrigger(compC chan<- cCmd) {
   716  	select {
   717  	case compC <- cAuto{}:
   718  	default:
   719  	}
   720  }
   721  
   722  // This will trigger auto compaction and/or wait for all compaction to be done.
   723  func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
   724  	ch := make(chan error)
   725  	defer close(ch)
   726  	// Send cmd.
   727  	select {
   728  	case compC <- cAuto{ch}:
   729  	case err = <-db.compErrC:
   730  		return
   731  	case <-db.closeC:
   732  		return ErrClosed
   733  	}
   734  	// Wait cmd.
   735  	select {
   736  	case err = <-ch:
   737  	case err = <-db.compErrC:
   738  	case <-db.closeC:
   739  		return ErrClosed
   740  	}
   741  	return err
   742  }
   743  
   744  // Send range compaction request.
   745  func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
   746  	ch := make(chan error)
   747  	defer close(ch)
   748  	// Send cmd.
   749  	select {
   750  	case compC <- cRange{level, min, max, ch}:
   751  	case err := <-db.compErrC:
   752  		return err
   753  	case <-db.closeC:
   754  		return ErrClosed
   755  	}
   756  	// Wait cmd.
   757  	select {
   758  	case err = <-ch:
   759  	case err = <-db.compErrC:
   760  	case <-db.closeC:
   761  		return ErrClosed
   762  	}
   763  	return err
   764  }
   765  
   766  func (db *DB) mCompaction() {
   767  	var x cCmd
   768  
   769  	defer func() {
   770  		if x := recover(); x != nil {
   771  			if x != errCompactionTransactExiting {
   772  				panic(x)
   773  			}
   774  		}
   775  		if x != nil {
   776  			x.ack(ErrClosed)
   777  		}
   778  		db.closeW.Done()
   779  	}()
   780  
   781  	for {
   782  		select {
   783  		case x = <-db.mcompCmdC:
   784  			switch x.(type) {
   785  			case cAuto:
   786  				db.memCompaction()
   787  				x.ack(nil)
   788  				x = nil
   789  			default:
   790  				panic("leveldb: unknown command")
   791  			}
   792  		case <-db.closeC:
   793  			return
   794  		}
   795  	}
   796  }
   797  
   798  func (db *DB) tCompaction() {
   799  	var (
   800  		x     cCmd
   801  		waitQ []cCmd
   802  	)
   803  
   804  	defer func() {
   805  		if x := recover(); x != nil {
   806  			if x != errCompactionTransactExiting {
   807  				panic(x)
   808  			}
   809  		}
   810  		for i := range waitQ {
   811  			waitQ[i].ack(ErrClosed)
   812  			waitQ[i] = nil
   813  		}
   814  		if x != nil {
   815  			x.ack(ErrClosed)
   816  		}
   817  		db.closeW.Done()
   818  	}()
   819  
   820  	for {
   821  		if db.tableNeedCompaction() {
   822  			select {
   823  			case x = <-db.tcompCmdC:
   824  			case ch := <-db.tcompPauseC:
   825  				db.pauseCompaction(ch)
   826  				continue
   827  			case <-db.closeC:
   828  				return
   829  			default:
   830  			}
   831  			// Resume write operation as soon as possible.
   832  			if len(waitQ) > 0 && db.resumeWrite() {
   833  				for i := range waitQ {
   834  					waitQ[i].ack(nil)
   835  					waitQ[i] = nil
   836  				}
   837  				waitQ = waitQ[:0]
   838  			}
   839  		} else {
   840  			for i := range waitQ {
   841  				waitQ[i].ack(nil)
   842  				waitQ[i] = nil
   843  			}
   844  			waitQ = waitQ[:0]
   845  			select {
   846  			case x = <-db.tcompCmdC:
   847  			case ch := <-db.tcompPauseC:
   848  				db.pauseCompaction(ch)
   849  				continue
   850  			case <-db.closeC:
   851  				return
   852  			}
   853  		}
   854  		if x != nil {
   855  			switch cmd := x.(type) {
   856  			case cAuto:
   857  				if cmd.ackC != nil {
   858  					// Check the write pause state before caching it.
   859  					if db.resumeWrite() {
   860  						x.ack(nil)
   861  					} else {
   862  						waitQ = append(waitQ, x)
   863  					}
   864  				}
   865  			case cRange:
   866  				x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
   867  			default:
   868  				panic("leveldb: unknown command")
   869  			}
   870  			x = nil
   871  		}
   872  		db.tableAutoCompaction()
   873  	}
   874  }
   875  

View as plain text