...

Source file src/github.com/syndtr/goleveldb/leveldb/db.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"container/list"
    11  	"fmt"
    12  	"io"
    13  	"os"
    14  	"runtime"
    15  	"strings"
    16  	"sync"
    17  	"sync/atomic"
    18  	"time"
    19  
    20  	"github.com/syndtr/goleveldb/leveldb/cache"
    21  	"github.com/syndtr/goleveldb/leveldb/errors"
    22  	"github.com/syndtr/goleveldb/leveldb/iterator"
    23  	"github.com/syndtr/goleveldb/leveldb/journal"
    24  	"github.com/syndtr/goleveldb/leveldb/memdb"
    25  	"github.com/syndtr/goleveldb/leveldb/opt"
    26  	"github.com/syndtr/goleveldb/leveldb/storage"
    27  	"github.com/syndtr/goleveldb/leveldb/table"
    28  	"github.com/syndtr/goleveldb/leveldb/util"
    29  )
    30  
    31  // DB is a LevelDB database.
    32  type DB struct {
    33  	// Need 64-bit alignment.
    34  	seq uint64
    35  
    36  	// Stats. Need 64-bit alignment.
    37  	cWriteDelay            int64 // The cumulative duration of write delays
    38  	cWriteDelayN           int32 // The cumulative number of write delays
    39  	inWritePaused          int32 // The indicator whether write operation is paused by compaction
    40  	aliveSnaps, aliveIters int32
    41  
    42  	// Compaction statistic
    43  	memComp       uint32 // The cumulative number of memory compaction
    44  	level0Comp    uint32 // The cumulative number of level0 compaction
    45  	nonLevel0Comp uint32 // The cumulative number of non-level0 compaction
    46  	seekComp      uint32 // The cumulative number of seek compaction
    47  
    48  	// Session.
    49  	s *session
    50  
    51  	// MemDB.
    52  	memMu           sync.RWMutex
    53  	memPool         chan *memdb.DB
    54  	mem, frozenMem  *memDB
    55  	journal         *journal.Writer
    56  	journalWriter   storage.Writer
    57  	journalFd       storage.FileDesc
    58  	frozenJournalFd storage.FileDesc
    59  	frozenSeq       uint64
    60  
    61  	// Snapshot.
    62  	snapsMu   sync.Mutex
    63  	snapsList *list.List
    64  
    65  	// Write.
    66  	batchPool    sync.Pool
    67  	writeMergeC  chan writeMerge
    68  	writeMergedC chan bool
    69  	writeLockC   chan struct{}
    70  	writeAckC    chan error
    71  	writeDelay   time.Duration
    72  	writeDelayN  int
    73  	tr           *Transaction
    74  
    75  	// Compaction.
    76  	compCommitLk     sync.Mutex
    77  	tcompCmdC        chan cCmd
    78  	tcompPauseC      chan chan<- struct{}
    79  	mcompCmdC        chan cCmd
    80  	compErrC         chan error
    81  	compPerErrC      chan error
    82  	compErrSetC      chan error
    83  	compWriteLocking bool
    84  	compStats        cStats
    85  	memdbMaxLevel    int // For testing.
    86  
    87  	// Close.
    88  	closeW sync.WaitGroup
    89  	closeC chan struct{}
    90  	closed uint32
    91  	closer io.Closer
    92  }
    93  
    94  func openDB(s *session) (*DB, error) {
    95  	s.log("db@open opening")
    96  	start := time.Now()
    97  	db := &DB{
    98  		s: s,
    99  		// Initial sequence
   100  		seq: s.stSeqNum,
   101  		// MemDB
   102  		memPool: make(chan *memdb.DB, 1),
   103  		// Snapshot
   104  		snapsList: list.New(),
   105  		// Write
   106  		batchPool:    sync.Pool{New: newBatch},
   107  		writeMergeC:  make(chan writeMerge),
   108  		writeMergedC: make(chan bool),
   109  		writeLockC:   make(chan struct{}, 1),
   110  		writeAckC:    make(chan error),
   111  		// Compaction
   112  		tcompCmdC:   make(chan cCmd),
   113  		tcompPauseC: make(chan chan<- struct{}),
   114  		mcompCmdC:   make(chan cCmd),
   115  		compErrC:    make(chan error),
   116  		compPerErrC: make(chan error),
   117  		compErrSetC: make(chan error),
   118  		// Close
   119  		closeC: make(chan struct{}),
   120  	}
   121  
   122  	// Read-only mode.
   123  	readOnly := s.o.GetReadOnly()
   124  
   125  	if readOnly {
   126  		// Recover journals (read-only mode).
   127  		if err := db.recoverJournalRO(); err != nil {
   128  			return nil, err
   129  		}
   130  	} else {
   131  		// Recover journals.
   132  		if err := db.recoverJournal(); err != nil {
   133  			return nil, err
   134  		}
   135  
   136  		// Remove any obsolete files.
   137  		if err := db.checkAndCleanFiles(); err != nil {
   138  			// Close journal.
   139  			if db.journal != nil {
   140  				db.journal.Close()
   141  				db.journalWriter.Close()
   142  			}
   143  			return nil, err
   144  		}
   145  	}
   146  
   147  	// Doesn't need to be included in the wait group.
   148  	go db.compactionError()
   149  	go db.mpoolDrain()
   150  
   151  	if readOnly {
   152  		if err := db.SetReadOnly(); err != nil {
   153  			return nil, err
   154  		}
   155  	} else {
   156  		db.closeW.Add(2)
   157  		go db.tCompaction()
   158  		go db.mCompaction()
   159  		// go db.jWriter()
   160  	}
   161  
   162  	s.logf("db@open done T·%v", time.Since(start))
   163  
   164  	runtime.SetFinalizer(db, (*DB).Close)
   165  	return db, nil
   166  }
   167  
   168  // Open opens or creates a DB for the given storage.
   169  // The DB will be created if not exist, unless ErrorIfMissing is true.
   170  // Also, if ErrorIfExist is true and the DB exist Open will returns
   171  // os.ErrExist error.
   172  //
   173  // Open will return an error with type of ErrCorrupted if corruption
   174  // detected in the DB. Use errors.IsCorrupted to test whether an error is
   175  // due to corruption. Corrupted DB can be recovered with Recover function.
   176  //
   177  // The returned DB instance is safe for concurrent use.
   178  // The DB must be closed after use, by calling Close method.
   179  func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
   180  	s, err := newSession(stor, o)
   181  	if err != nil {
   182  		return
   183  	}
   184  	defer func() {
   185  		if err != nil {
   186  			s.close()
   187  			s.release()
   188  		}
   189  	}()
   190  
   191  	err = s.recover()
   192  	if err != nil {
   193  		if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() {
   194  			return
   195  		}
   196  		err = s.create()
   197  		if err != nil {
   198  			return
   199  		}
   200  	} else if s.o.GetErrorIfExist() {
   201  		err = os.ErrExist
   202  		return
   203  	}
   204  
   205  	return openDB(s)
   206  }
   207  
   208  // OpenFile opens or creates a DB for the given path.
   209  // The DB will be created if not exist, unless ErrorIfMissing is true.
   210  // Also, if ErrorIfExist is true and the DB exist OpenFile will returns
   211  // os.ErrExist error.
   212  //
   213  // OpenFile uses standard file-system backed storage implementation as
   214  // described in the leveldb/storage package.
   215  //
   216  // OpenFile will return an error with type of ErrCorrupted if corruption
   217  // detected in the DB. Use errors.IsCorrupted to test whether an error is
   218  // due to corruption. Corrupted DB can be recovered with Recover function.
   219  //
   220  // The returned DB instance is safe for concurrent use.
   221  // The DB must be closed after use, by calling Close method.
   222  func OpenFile(path string, o *opt.Options) (db *DB, err error) {
   223  	stor, err := storage.OpenFile(path, o.GetReadOnly())
   224  	if err != nil {
   225  		return
   226  	}
   227  	db, err = Open(stor, o)
   228  	if err != nil {
   229  		stor.Close()
   230  	} else {
   231  		db.closer = stor
   232  	}
   233  	return
   234  }
   235  
   236  // Recover recovers and opens a DB with missing or corrupted manifest files
   237  // for the given storage. It will ignore any manifest files, valid or not.
   238  // The DB must already exist or it will returns an error.
   239  // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
   240  //
   241  // The returned DB instance is safe for concurrent use.
   242  // The DB must be closed after use, by calling Close method.
   243  func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
   244  	s, err := newSession(stor, o)
   245  	if err != nil {
   246  		return
   247  	}
   248  	defer func() {
   249  		if err != nil {
   250  			s.close()
   251  			s.release()
   252  		}
   253  	}()
   254  
   255  	err = recoverTable(s, o)
   256  	if err != nil {
   257  		return
   258  	}
   259  	return openDB(s)
   260  }
   261  
   262  // RecoverFile recovers and opens a DB with missing or corrupted manifest files
   263  // for the given path. It will ignore any manifest files, valid or not.
   264  // The DB must already exist or it will returns an error.
   265  // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
   266  //
   267  // RecoverFile uses standard file-system backed storage implementation as described
   268  // in the leveldb/storage package.
   269  //
   270  // The returned DB instance is safe for concurrent use.
   271  // The DB must be closed after use, by calling Close method.
   272  func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
   273  	stor, err := storage.OpenFile(path, false)
   274  	if err != nil {
   275  		return
   276  	}
   277  	db, err = Recover(stor, o)
   278  	if err != nil {
   279  		stor.Close()
   280  	} else {
   281  		db.closer = stor
   282  	}
   283  	return
   284  }
   285  
   286  func recoverTable(s *session, o *opt.Options) error {
   287  	o = dupOptions(o)
   288  	// Mask StrictReader, lets StrictRecovery doing its job.
   289  	o.Strict &= ^opt.StrictReader
   290  
   291  	// Get all tables and sort it by file number.
   292  	fds, err := s.stor.List(storage.TypeTable)
   293  	if err != nil {
   294  		return err
   295  	}
   296  	sortFds(fds)
   297  
   298  	var (
   299  		maxSeq                                                            uint64
   300  		recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
   301  
   302  		// We will drop corrupted table.
   303  		strict = o.GetStrict(opt.StrictRecovery)
   304  		noSync = o.GetNoSync()
   305  
   306  		rec   = &sessionRecord{}
   307  		bpool = util.NewBufferPool(o.GetBlockSize() + 5)
   308  	)
   309  	buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
   310  		tmpFd = s.newTemp()
   311  		writer, err := s.stor.Create(tmpFd)
   312  		if err != nil {
   313  			return
   314  		}
   315  		defer func() {
   316  			if cerr := writer.Close(); cerr != nil {
   317  				if err == nil {
   318  					err = cerr
   319  				} else {
   320  					err = fmt.Errorf("error recovering table (%v); error closing (%v)", err, cerr)
   321  				}
   322  			}
   323  			if err != nil {
   324  				if rerr := s.stor.Remove(tmpFd); rerr != nil {
   325  					err = fmt.Errorf("error recovering table (%v); error removing (%v)", err, rerr)
   326  				}
   327  				tmpFd = storage.FileDesc{}
   328  			}
   329  		}()
   330  
   331  		// Copy entries.
   332  		tw := table.NewWriter(writer, o, nil, 0)
   333  		for iter.Next() {
   334  			key := iter.Key()
   335  			if validInternalKey(key) {
   336  				err = tw.Append(key, iter.Value())
   337  				if err != nil {
   338  					return
   339  				}
   340  			}
   341  		}
   342  		err = iter.Error()
   343  		if err != nil && !errors.IsCorrupted(err) {
   344  			return
   345  		}
   346  		err = tw.Close()
   347  		if err != nil {
   348  			return
   349  		}
   350  		if !noSync {
   351  			err = writer.Sync()
   352  			if err != nil {
   353  				return
   354  			}
   355  		}
   356  		size = int64(tw.BytesLen())
   357  		return
   358  	}
   359  	recoverTable := func(fd storage.FileDesc) error {
   360  		s.logf("table@recovery recovering @%d", fd.Num)
   361  		reader, err := s.stor.Open(fd)
   362  		if err != nil {
   363  			return err
   364  		}
   365  		var closed bool
   366  		defer func() {
   367  			if !closed {
   368  				reader.Close()
   369  			}
   370  		}()
   371  
   372  		// Get file size.
   373  		size, err := reader.Seek(0, 2)
   374  		if err != nil {
   375  			return err
   376  		}
   377  
   378  		var (
   379  			tSeq                                     uint64
   380  			tgoodKey, tcorruptedKey, tcorruptedBlock int
   381  			imin, imax                               []byte
   382  		)
   383  		tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
   384  		if err != nil {
   385  			return err
   386  		}
   387  		iter := tr.NewIterator(nil, nil)
   388  		if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
   389  			itererr.SetErrorCallback(func(err error) {
   390  				if errors.IsCorrupted(err) {
   391  					s.logf("table@recovery block corruption @%d %q", fd.Num, err)
   392  					tcorruptedBlock++
   393  				}
   394  			})
   395  		}
   396  
   397  		// Scan the table.
   398  		for iter.Next() {
   399  			key := iter.Key()
   400  			_, seq, _, kerr := parseInternalKey(key)
   401  			if kerr != nil {
   402  				tcorruptedKey++
   403  				continue
   404  			}
   405  			tgoodKey++
   406  			if seq > tSeq {
   407  				tSeq = seq
   408  			}
   409  			if imin == nil {
   410  				imin = append([]byte(nil), key...)
   411  			}
   412  			imax = append(imax[:0], key...)
   413  		}
   414  		if err := iter.Error(); err != nil && !errors.IsCorrupted(err) {
   415  			iter.Release()
   416  			return err
   417  		}
   418  		iter.Release()
   419  
   420  		goodKey += tgoodKey
   421  		corruptedKey += tcorruptedKey
   422  		corruptedBlock += tcorruptedBlock
   423  
   424  		if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
   425  			droppedTable++
   426  			s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
   427  			return nil
   428  		}
   429  
   430  		if tgoodKey > 0 {
   431  			if tcorruptedKey > 0 || tcorruptedBlock > 0 {
   432  				// Rebuild the table.
   433  				s.logf("table@recovery rebuilding @%d", fd.Num)
   434  				iter := tr.NewIterator(nil, nil)
   435  				tmpFd, newSize, err := buildTable(iter)
   436  				iter.Release()
   437  				if err != nil {
   438  					return err
   439  				}
   440  				closed = true
   441  				reader.Close()
   442  				if err := s.stor.Rename(tmpFd, fd); err != nil {
   443  					return err
   444  				}
   445  				size = newSize
   446  			}
   447  			if tSeq > maxSeq {
   448  				maxSeq = tSeq
   449  			}
   450  			recoveredKey += tgoodKey
   451  			// Add table to level 0.
   452  			rec.addTable(0, fd.Num, size, imin, imax)
   453  			s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
   454  		} else {
   455  			droppedTable++
   456  			s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
   457  		}
   458  
   459  		return nil
   460  	}
   461  
   462  	// Recover all tables.
   463  	if len(fds) > 0 {
   464  		s.logf("table@recovery F·%d", len(fds))
   465  
   466  		// Mark file number as used.
   467  		s.markFileNum(fds[len(fds)-1].Num)
   468  
   469  		for _, fd := range fds {
   470  			if err := recoverTable(fd); err != nil {
   471  				return err
   472  			}
   473  		}
   474  
   475  		s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
   476  	}
   477  
   478  	// Set sequence number.
   479  	rec.setSeqNum(maxSeq)
   480  
   481  	// Create new manifest.
   482  	if err := s.create(); err != nil {
   483  		return err
   484  	}
   485  
   486  	// Commit.
   487  	return s.commit(rec, false)
   488  }
   489  
   490  func (db *DB) recoverJournal() error {
   491  	// Get all journals and sort it by file number.
   492  	rawFds, err := db.s.stor.List(storage.TypeJournal)
   493  	if err != nil {
   494  		return err
   495  	}
   496  	sortFds(rawFds)
   497  
   498  	// Journals that will be recovered.
   499  	var fds []storage.FileDesc
   500  	for _, fd := range rawFds {
   501  		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
   502  			fds = append(fds, fd)
   503  		}
   504  	}
   505  
   506  	var (
   507  		ofd storage.FileDesc // Obsolete file.
   508  		rec = &sessionRecord{}
   509  	)
   510  
   511  	// Recover journals.
   512  	if len(fds) > 0 {
   513  		db.logf("journal@recovery F·%d", len(fds))
   514  
   515  		// Mark file number as used.
   516  		db.s.markFileNum(fds[len(fds)-1].Num)
   517  
   518  		var (
   519  			// Options.
   520  			strict      = db.s.o.GetStrict(opt.StrictJournal)
   521  			checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
   522  			writeBuffer = db.s.o.GetWriteBuffer()
   523  
   524  			jr       *journal.Reader
   525  			mdb      = memdb.New(db.s.icmp, writeBuffer)
   526  			buf      = &util.Buffer{}
   527  			batchSeq uint64
   528  			batchLen int
   529  		)
   530  
   531  		for _, fd := range fds {
   532  			db.logf("journal@recovery recovering @%d", fd.Num)
   533  
   534  			fr, err := db.s.stor.Open(fd)
   535  			if err != nil {
   536  				return err
   537  			}
   538  
   539  			// Create or reset journal reader instance.
   540  			if jr == nil {
   541  				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
   542  			} else {
   543  				// Ignore the error here
   544  				_ = jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
   545  			}
   546  
   547  			// Flush memdb and remove obsolete journal file.
   548  			if !ofd.Zero() {
   549  				if mdb.Len() > 0 {
   550  					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
   551  						fr.Close()
   552  						return err
   553  					}
   554  				}
   555  
   556  				rec.setJournalNum(fd.Num)
   557  				rec.setSeqNum(db.seq)
   558  				if err := db.s.commit(rec, false); err != nil {
   559  					fr.Close()
   560  					return err
   561  				}
   562  				rec.resetAddedTables()
   563  
   564  				if err := db.s.stor.Remove(ofd); err != nil {
   565  					fr.Close()
   566  					return err
   567  				}
   568  				ofd = storage.FileDesc{}
   569  			}
   570  
   571  			// Replay journal to memdb.
   572  			mdb.Reset()
   573  			for {
   574  				r, err := jr.Next()
   575  				if err != nil {
   576  					if err == io.EOF {
   577  						break
   578  					}
   579  
   580  					fr.Close()
   581  					return errors.SetFd(err, fd)
   582  				}
   583  
   584  				buf.Reset()
   585  				if _, err := buf.ReadFrom(r); err != nil {
   586  					if err == io.ErrUnexpectedEOF {
   587  						// This is error returned due to corruption, with strict == false.
   588  						continue
   589  					}
   590  
   591  					fr.Close()
   592  					return errors.SetFd(err, fd)
   593  				}
   594  				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
   595  				if err != nil {
   596  					if !strict && errors.IsCorrupted(err) {
   597  						db.s.logf("journal error: %v (skipped)", err)
   598  						// We won't apply sequence number as it might be corrupted.
   599  						continue
   600  					}
   601  
   602  					fr.Close()
   603  					return errors.SetFd(err, fd)
   604  				}
   605  
   606  				// Save sequence number.
   607  				db.seq = batchSeq + uint64(batchLen)
   608  
   609  				// Flush it if large enough.
   610  				if mdb.Size() >= writeBuffer {
   611  					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
   612  						fr.Close()
   613  						return err
   614  					}
   615  
   616  					mdb.Reset()
   617  				}
   618  			}
   619  
   620  			fr.Close()
   621  			ofd = fd
   622  		}
   623  
   624  		// Flush the last memdb.
   625  		if mdb.Len() > 0 {
   626  			if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
   627  				return err
   628  			}
   629  		}
   630  	}
   631  
   632  	// Create a new journal.
   633  	if _, err := db.newMem(0); err != nil {
   634  		return err
   635  	}
   636  
   637  	// Commit.
   638  	rec.setJournalNum(db.journalFd.Num)
   639  	rec.setSeqNum(db.seq)
   640  	if err := db.s.commit(rec, false); err != nil {
   641  		// Close journal on error.
   642  		if db.journal != nil {
   643  			db.journal.Close()
   644  			db.journalWriter.Close()
   645  		}
   646  		return err
   647  	}
   648  
   649  	// Remove the last obsolete journal file.
   650  	if !ofd.Zero() {
   651  		if err := db.s.stor.Remove(ofd); err != nil {
   652  			return err
   653  		}
   654  	}
   655  
   656  	return nil
   657  }
   658  
   659  func (db *DB) recoverJournalRO() error {
   660  	// Get all journals and sort it by file number.
   661  	rawFds, err := db.s.stor.List(storage.TypeJournal)
   662  	if err != nil {
   663  		return err
   664  	}
   665  	sortFds(rawFds)
   666  
   667  	// Journals that will be recovered.
   668  	var fds []storage.FileDesc
   669  	for _, fd := range rawFds {
   670  		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
   671  			fds = append(fds, fd)
   672  		}
   673  	}
   674  
   675  	var (
   676  		// Options.
   677  		strict      = db.s.o.GetStrict(opt.StrictJournal)
   678  		checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
   679  		writeBuffer = db.s.o.GetWriteBuffer()
   680  
   681  		mdb = memdb.New(db.s.icmp, writeBuffer)
   682  	)
   683  
   684  	// Recover journals.
   685  	if len(fds) > 0 {
   686  		db.logf("journal@recovery RO·Mode F·%d", len(fds))
   687  
   688  		var (
   689  			jr       *journal.Reader
   690  			buf      = &util.Buffer{}
   691  			batchSeq uint64
   692  			batchLen int
   693  		)
   694  
   695  		for _, fd := range fds {
   696  			db.logf("journal@recovery recovering @%d", fd.Num)
   697  
   698  			fr, err := db.s.stor.Open(fd)
   699  			if err != nil {
   700  				return err
   701  			}
   702  
   703  			// Create or reset journal reader instance.
   704  			if jr == nil {
   705  				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
   706  			} else {
   707  				if err := jr.Reset(fr, dropper{db.s, fd}, strict, checksum); err != nil {
   708  					return err
   709  				}
   710  			}
   711  
   712  			// Replay journal to memdb.
   713  			for {
   714  				r, err := jr.Next()
   715  				if err != nil {
   716  					if err == io.EOF {
   717  						break
   718  					}
   719  
   720  					fr.Close()
   721  					return errors.SetFd(err, fd)
   722  				}
   723  
   724  				buf.Reset()
   725  				if _, err := buf.ReadFrom(r); err != nil {
   726  					if err == io.ErrUnexpectedEOF {
   727  						// This is error returned due to corruption, with strict == false.
   728  						continue
   729  					}
   730  
   731  					fr.Close()
   732  					return errors.SetFd(err, fd)
   733  				}
   734  				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
   735  				if err != nil {
   736  					if !strict && errors.IsCorrupted(err) {
   737  						db.s.logf("journal error: %v (skipped)", err)
   738  						// We won't apply sequence number as it might be corrupted.
   739  						continue
   740  					}
   741  
   742  					fr.Close()
   743  					return errors.SetFd(err, fd)
   744  				}
   745  
   746  				// Save sequence number.
   747  				db.seq = batchSeq + uint64(batchLen)
   748  			}
   749  
   750  			fr.Close()
   751  		}
   752  	}
   753  
   754  	// Set memDB.
   755  	db.mem = &memDB{db: db, DB: mdb, ref: 1}
   756  
   757  	return nil
   758  }
   759  
   760  func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
   761  	mk, mv, err := mdb.Find(ikey)
   762  	if err == nil {
   763  		ukey, _, kt, kerr := parseInternalKey(mk)
   764  		if kerr != nil {
   765  			// Shouldn't have had happen.
   766  			panic(kerr)
   767  		}
   768  		if icmp.uCompare(ukey, ikey.ukey()) == 0 {
   769  			if kt == keyTypeDel {
   770  				return true, nil, ErrNotFound
   771  			}
   772  			return true, mv, nil
   773  
   774  		}
   775  	} else if err != ErrNotFound {
   776  		return true, nil, err
   777  	}
   778  	return
   779  }
   780  
   781  func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
   782  	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
   783  
   784  	if auxm != nil {
   785  		if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
   786  			return append([]byte(nil), mv...), me
   787  		}
   788  	}
   789  
   790  	em, fm := db.getMems()
   791  	for _, m := range [...]*memDB{em, fm} {
   792  		if m == nil {
   793  			continue
   794  		}
   795  		defer m.decref()
   796  
   797  		if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
   798  			return append([]byte(nil), mv...), me
   799  		}
   800  	}
   801  
   802  	v := db.s.version()
   803  	value, cSched, err := v.get(auxt, ikey, ro, false)
   804  	v.release()
   805  	if cSched {
   806  		// Trigger table compaction.
   807  		db.compTrigger(db.tcompCmdC)
   808  	}
   809  	return
   810  }
   811  
   812  func nilIfNotFound(err error) error {
   813  	if err == ErrNotFound {
   814  		return nil
   815  	}
   816  	return err
   817  }
   818  
   819  func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
   820  	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
   821  
   822  	if auxm != nil {
   823  		if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
   824  			return me == nil, nilIfNotFound(me)
   825  		}
   826  	}
   827  
   828  	em, fm := db.getMems()
   829  	for _, m := range [...]*memDB{em, fm} {
   830  		if m == nil {
   831  			continue
   832  		}
   833  		defer m.decref()
   834  
   835  		if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
   836  			return me == nil, nilIfNotFound(me)
   837  		}
   838  	}
   839  
   840  	v := db.s.version()
   841  	_, cSched, err := v.get(auxt, ikey, ro, true)
   842  	v.release()
   843  	if cSched {
   844  		// Trigger table compaction.
   845  		db.compTrigger(db.tcompCmdC)
   846  	}
   847  	if err == nil {
   848  		ret = true
   849  	} else if err == ErrNotFound {
   850  		err = nil
   851  	}
   852  	return
   853  }
   854  
   855  // Get gets the value for the given key. It returns ErrNotFound if the
   856  // DB does not contains the key.
   857  //
   858  // The returned slice is its own copy, it is safe to modify the contents
   859  // of the returned slice.
   860  // It is safe to modify the contents of the argument after Get returns.
   861  func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
   862  	err = db.ok()
   863  	if err != nil {
   864  		return
   865  	}
   866  
   867  	se := db.acquireSnapshot()
   868  	defer db.releaseSnapshot(se)
   869  	return db.get(nil, nil, key, se.seq, ro)
   870  }
   871  
   872  // Has returns true if the DB does contains the given key.
   873  //
   874  // It is safe to modify the contents of the argument after Has returns.
   875  func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
   876  	err = db.ok()
   877  	if err != nil {
   878  		return
   879  	}
   880  
   881  	se := db.acquireSnapshot()
   882  	defer db.releaseSnapshot(se)
   883  	return db.has(nil, nil, key, se.seq, ro)
   884  }
   885  
   886  // NewIterator returns an iterator for the latest snapshot of the
   887  // underlying DB.
   888  // The returned iterator is not safe for concurrent use, but it is safe to use
   889  // multiple iterators concurrently, with each in a dedicated goroutine.
   890  // It is also safe to use an iterator concurrently with modifying its
   891  // underlying DB. The resultant key/value pairs are guaranteed to be
   892  // consistent.
   893  //
   894  // Slice allows slicing the iterator to only contains keys in the given
   895  // range. A nil Range.Start is treated as a key before all keys in the
   896  // DB. And a nil Range.Limit is treated as a key after all keys in
   897  // the DB.
   898  //
   899  // WARNING: Any slice returned by interator (e.g. slice returned by calling
   900  // Iterator.Key() or Iterator.Key() methods), its content should not be modified
   901  // unless noted otherwise.
   902  //
   903  // The iterator must be released after use, by calling Release method.
   904  //
   905  // Also read Iterator documentation of the leveldb/iterator package.
   906  func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
   907  	if err := db.ok(); err != nil {
   908  		return iterator.NewEmptyIterator(err)
   909  	}
   910  
   911  	se := db.acquireSnapshot()
   912  	defer db.releaseSnapshot(se)
   913  	// Iterator holds 'version' lock, 'version' is immutable so snapshot
   914  	// can be released after iterator created.
   915  	return db.newIterator(nil, nil, se.seq, slice, ro)
   916  }
   917  
   918  // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
   919  // is a frozen snapshot of a DB state at a particular point in time. The
   920  // content of snapshot are guaranteed to be consistent.
   921  //
   922  // The snapshot must be released after use, by calling Release method.
   923  func (db *DB) GetSnapshot() (*Snapshot, error) {
   924  	if err := db.ok(); err != nil {
   925  		return nil, err
   926  	}
   927  
   928  	return db.newSnapshot(), nil
   929  }
   930  
   931  // GetProperty returns value of the given property name.
   932  //
   933  // Property names:
   934  //	leveldb.num-files-at-level{n}
   935  //		Returns the number of files at level 'n'.
   936  //	leveldb.stats
   937  //		Returns statistics of the underlying DB.
   938  //	leveldb.iostats
   939  //		Returns statistics of effective disk read and write.
   940  //	leveldb.writedelay
   941  //		Returns cumulative write delay caused by compaction.
   942  //	leveldb.sstables
   943  //		Returns sstables list for each level.
   944  //	leveldb.blockpool
   945  //		Returns block pool stats.
   946  //	leveldb.cachedblock
   947  //		Returns size of cached block.
   948  //	leveldb.openedtables
   949  //		Returns number of opened tables.
   950  //	leveldb.alivesnaps
   951  //		Returns number of alive snapshots.
   952  //	leveldb.aliveiters
   953  //		Returns number of alive iterators.
   954  func (db *DB) GetProperty(name string) (value string, err error) {
   955  	err = db.ok()
   956  	if err != nil {
   957  		return
   958  	}
   959  
   960  	const prefix = "leveldb."
   961  	if !strings.HasPrefix(name, prefix) {
   962  		return "", ErrNotFound
   963  	}
   964  	p := name[len(prefix):]
   965  
   966  	v := db.s.version()
   967  	defer v.release()
   968  
   969  	numFilesPrefix := "num-files-at-level"
   970  	switch {
   971  	case strings.HasPrefix(p, numFilesPrefix):
   972  		var level uint
   973  		var rest string
   974  		n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
   975  		if n != 1 {
   976  			err = ErrNotFound
   977  		} else {
   978  			value = fmt.Sprint(v.tLen(int(level)))
   979  		}
   980  	case p == "stats":
   981  		value = "Compactions\n" +
   982  			" Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)\n" +
   983  			"-------+------------+---------------+---------------+---------------+---------------\n"
   984  		var totalTables int
   985  		var totalSize, totalRead, totalWrite int64
   986  		var totalDuration time.Duration
   987  		for level, tables := range v.levels {
   988  			duration, read, write := db.compStats.getStat(level)
   989  			if len(tables) == 0 && duration == 0 {
   990  				continue
   991  			}
   992  			totalTables += len(tables)
   993  			totalSize += tables.size()
   994  			totalRead += read
   995  			totalWrite += write
   996  			totalDuration += duration
   997  			value += fmt.Sprintf(" %3d   | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
   998  				level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
   999  				float64(read)/1048576.0, float64(write)/1048576.0)
  1000  		}
  1001  		value += "-------+------------+---------------+---------------+---------------+---------------\n"
  1002  		value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
  1003  			totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(),
  1004  			float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0)
  1005  	case p == "compcount":
  1006  		value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp))
  1007  	case p == "iostats":
  1008  		value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
  1009  			float64(db.s.stor.reads())/1048576.0,
  1010  			float64(db.s.stor.writes())/1048576.0)
  1011  	case p == "writedelay":
  1012  		writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
  1013  		paused := atomic.LoadInt32(&db.inWritePaused) == 1
  1014  		value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
  1015  	case p == "sstables":
  1016  		for level, tables := range v.levels {
  1017  			value += fmt.Sprintf("--- level %d ---\n", level)
  1018  			for _, t := range tables {
  1019  				value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
  1020  			}
  1021  		}
  1022  	case p == "blockpool":
  1023  		value = fmt.Sprintf("%v", db.s.tops.blockBuffer)
  1024  	case p == "cachedblock":
  1025  		if db.s.tops.blockCache != nil {
  1026  			value = fmt.Sprintf("%d", db.s.tops.blockCache.Size())
  1027  		} else {
  1028  			value = "<nil>"
  1029  		}
  1030  	case p == "openedtables":
  1031  		value = fmt.Sprintf("%d", db.s.tops.fileCache.Size())
  1032  	case p == "alivesnaps":
  1033  		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
  1034  	case p == "aliveiters":
  1035  		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
  1036  	default:
  1037  		err = ErrNotFound
  1038  	}
  1039  
  1040  	return
  1041  }
  1042  
  1043  // DBStats is database statistics.
  1044  type DBStats struct {
  1045  	WriteDelayCount    int32
  1046  	WriteDelayDuration time.Duration
  1047  	WritePaused        bool
  1048  
  1049  	AliveSnapshots int32
  1050  	AliveIterators int32
  1051  
  1052  	IOWrite uint64
  1053  	IORead  uint64
  1054  
  1055  	BlockCacheSize    int
  1056  	OpenedTablesCount int
  1057  
  1058  	FileCache  cache.Stats
  1059  	BlockCache cache.Stats
  1060  
  1061  	LevelSizes        Sizes
  1062  	LevelTablesCounts []int
  1063  	LevelRead         Sizes
  1064  	LevelWrite        Sizes
  1065  	LevelDurations    []time.Duration
  1066  
  1067  	MemComp       uint32
  1068  	Level0Comp    uint32
  1069  	NonLevel0Comp uint32
  1070  	SeekComp      uint32
  1071  }
  1072  
  1073  // Stats populates s with database statistics.
  1074  func (db *DB) Stats(s *DBStats) error {
  1075  	err := db.ok()
  1076  	if err != nil {
  1077  		return err
  1078  	}
  1079  
  1080  	s.IORead = db.s.stor.reads()
  1081  	s.IOWrite = db.s.stor.writes()
  1082  	s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
  1083  	s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
  1084  	s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
  1085  
  1086  	s.OpenedTablesCount = db.s.tops.fileCache.Size()
  1087  	if db.s.tops.blockCache != nil {
  1088  		s.BlockCacheSize = db.s.tops.blockCache.Size()
  1089  	} else {
  1090  		s.BlockCacheSize = 0
  1091  	}
  1092  
  1093  	s.FileCache = db.s.tops.fileCache.GetStats()
  1094  	if db.s.tops.blockCache != nil {
  1095  		s.BlockCache = db.s.tops.blockCache.GetStats()
  1096  	} else {
  1097  		s.BlockCache = cache.Stats{}
  1098  	}
  1099  
  1100  	s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
  1101  	s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
  1102  
  1103  	s.LevelDurations = s.LevelDurations[:0]
  1104  	s.LevelRead = s.LevelRead[:0]
  1105  	s.LevelWrite = s.LevelWrite[:0]
  1106  	s.LevelSizes = s.LevelSizes[:0]
  1107  	s.LevelTablesCounts = s.LevelTablesCounts[:0]
  1108  
  1109  	v := db.s.version()
  1110  	defer v.release()
  1111  
  1112  	for level, tables := range v.levels {
  1113  		duration, read, write := db.compStats.getStat(level)
  1114  
  1115  		s.LevelDurations = append(s.LevelDurations, duration)
  1116  		s.LevelRead = append(s.LevelRead, read)
  1117  		s.LevelWrite = append(s.LevelWrite, write)
  1118  		s.LevelSizes = append(s.LevelSizes, tables.size())
  1119  		s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
  1120  	}
  1121  	s.MemComp = atomic.LoadUint32(&db.memComp)
  1122  	s.Level0Comp = atomic.LoadUint32(&db.level0Comp)
  1123  	s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp)
  1124  	s.SeekComp = atomic.LoadUint32(&db.seekComp)
  1125  	return nil
  1126  }
  1127  
  1128  // SizeOf calculates approximate sizes of the given key ranges.
  1129  // The length of the returned sizes are equal with the length of the given
  1130  // ranges. The returned sizes measure storage space usage, so if the user
  1131  // data compresses by a factor of ten, the returned sizes will be one-tenth
  1132  // the size of the corresponding user data size.
  1133  // The results may not include the sizes of recently written data.
  1134  func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
  1135  	if err := db.ok(); err != nil {
  1136  		return nil, err
  1137  	}
  1138  
  1139  	v := db.s.version()
  1140  	defer v.release()
  1141  
  1142  	sizes := make(Sizes, 0, len(ranges))
  1143  	for _, r := range ranges {
  1144  		imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
  1145  		imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
  1146  		start, err := v.offsetOf(imin)
  1147  		if err != nil {
  1148  			return nil, err
  1149  		}
  1150  		limit, err := v.offsetOf(imax)
  1151  		if err != nil {
  1152  			return nil, err
  1153  		}
  1154  		var size int64
  1155  		if limit >= start {
  1156  			size = limit - start
  1157  		}
  1158  		sizes = append(sizes, size)
  1159  	}
  1160  
  1161  	return sizes, nil
  1162  }
  1163  
  1164  // Close closes the DB. This will also releases any outstanding snapshot,
  1165  // abort any in-flight compaction and discard open transaction.
  1166  //
  1167  // It is not safe to close a DB until all outstanding iterators are released.
  1168  // It is valid to call Close multiple times. Other methods should not be
  1169  // called after the DB has been closed.
  1170  func (db *DB) Close() error {
  1171  	if !db.setClosed() {
  1172  		return ErrClosed
  1173  	}
  1174  
  1175  	start := time.Now()
  1176  	db.log("db@close closing")
  1177  
  1178  	// Clear the finalizer.
  1179  	runtime.SetFinalizer(db, nil)
  1180  
  1181  	// Get compaction error.
  1182  	var err error
  1183  	select {
  1184  	case err = <-db.compErrC:
  1185  		if err == ErrReadOnly {
  1186  			err = nil
  1187  		}
  1188  	default:
  1189  	}
  1190  
  1191  	// Signal all goroutines.
  1192  	close(db.closeC)
  1193  
  1194  	// Discard open transaction.
  1195  	if db.tr != nil {
  1196  		db.tr.Discard()
  1197  	}
  1198  
  1199  	// Acquire writer lock.
  1200  	db.writeLockC <- struct{}{}
  1201  
  1202  	// Wait for all gorotines to exit.
  1203  	db.closeW.Wait()
  1204  
  1205  	// Closes journal.
  1206  	if db.journal != nil {
  1207  		db.journal.Close()
  1208  		db.journalWriter.Close()
  1209  		db.journal = nil
  1210  		db.journalWriter = nil
  1211  	}
  1212  
  1213  	if db.writeDelayN > 0 {
  1214  		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
  1215  	}
  1216  
  1217  	// Close session.
  1218  	db.s.close()
  1219  	db.logf("db@close done T·%v", time.Since(start))
  1220  	db.s.release()
  1221  
  1222  	if db.closer != nil {
  1223  		if err1 := db.closer.Close(); err == nil {
  1224  			err = err1
  1225  		}
  1226  		db.closer = nil
  1227  	}
  1228  
  1229  	// Clear memdbs.
  1230  	db.clearMems()
  1231  
  1232  	return err
  1233  }
  1234  

View as plain text